HBASE-26819 Minor code cleanup in and around RpcScheduler
Signed-off-by: Duo Zhang <zhangduo@apache.org>
This commit is contained in:
parent
068031ea82
commit
5851400a46
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -17,7 +17,6 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.ipc;
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
@ -26,11 +25,10 @@ import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
|
import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -92,7 +90,7 @@ public class FifoRpcScheduler extends RpcScheduler {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean dispatch(final CallRunner task) throws IOException, InterruptedException {
|
public boolean dispatch(final CallRunner task) {
|
||||||
return executeRpcCall(executor, queueSize, task);
|
return executeRpcCall(executor, queueSize, task);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -17,20 +17,18 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.ipc;
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.concurrent.ArrayBlockingQueue;
|
import java.util.concurrent.ArrayBlockingQueue;
|
||||||
import java.util.concurrent.ThreadPoolExecutor;
|
import java.util.concurrent.ThreadPoolExecutor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
import org.apache.hbase.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A special {@code }RpcScheduler} only used for master. This scheduler separates RegionServerReport
|
* A special {@code }RpcScheduler} only used for master. This scheduler separates RegionServerReport
|
||||||
|
@ -90,7 +88,7 @@ public class MasterFifoRpcScheduler extends FifoRpcScheduler {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean dispatch(final CallRunner task) throws IOException, InterruptedException {
|
public boolean dispatch(final CallRunner task) {
|
||||||
String method = getCallMethod(task);
|
String method = getCallMethod(task);
|
||||||
if (rsReportExecutor != null && method != null && method.equals(REGION_SERVER_REPORT)) {
|
if (rsReportExecutor != null && method != null && method.equals(REGION_SERVER_REPORT)) {
|
||||||
return executeRpcCall(rsReportExecutor, rsReportQueueSize, task);
|
return executeRpcCall(rsReportExecutor, rsReportQueueSize, task);
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -17,12 +17,10 @@
|
||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.ipc;
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An interface for RPC request scheduling algorithm.
|
* An interface for RPC request scheduling algorithm.
|
||||||
|
@ -65,7 +63,7 @@ public abstract class RpcScheduler {
|
||||||
*
|
*
|
||||||
* @param task the request to be dispatched
|
* @param task the request to be dispatched
|
||||||
*/
|
*/
|
||||||
public abstract boolean dispatch(CallRunner task) throws IOException, InterruptedException;
|
public abstract boolean dispatch(CallRunner task);
|
||||||
|
|
||||||
/** Get call queue information **/
|
/** Get call queue information **/
|
||||||
public abstract CallQueueInfo getCallQueueInfo();
|
public abstract CallQueueInfo getCallQueueInfo();
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -21,10 +21,10 @@ import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.Abortable;
|
import org.apache.hadoop.hbase.Abortable;
|
||||||
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.yetus.audience.InterfaceAudience;
|
|
||||||
import org.apache.yetus.audience.InterfaceStability;
|
|
||||||
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
import org.apache.hadoop.hbase.master.MasterAnnotationReadingPriorityFunction;
|
import org.apache.hadoop.hbase.master.MasterAnnotationReadingPriorityFunction;
|
||||||
|
import org.apache.yetus.audience.InterfaceAudience;
|
||||||
|
import org.apache.yetus.audience.InterfaceStability;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The default scheduler. Configurable. Maintains isolated handler pools for general ('default'),
|
* The default scheduler. Configurable. Maintains isolated handler pools for general ('default'),
|
||||||
|
@ -198,7 +198,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean dispatch(CallRunner callTask) throws InterruptedException {
|
public boolean dispatch(CallRunner callTask) {
|
||||||
RpcCall call = callTask.getRpcCall();
|
RpcCall call = callTask.getRpcCall();
|
||||||
int level = priority.getPriority(call.getHeader(), call.getParam(),
|
int level = priority.getPriority(call.getHeader(), call.getParam(),
|
||||||
call.getRequestUser().orElse(null));
|
call.getRequestUser().orElse(null));
|
||||||
|
|
|
@ -27,13 +27,11 @@ import static org.mockito.Mockito.mock;
|
||||||
import static org.mockito.Mockito.reset;
|
import static org.mockito.Mockito.reset;
|
||||||
import static org.mockito.Mockito.times;
|
import static org.mockito.Mockito.times;
|
||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.hbase.client.Connection;
|
import org.apache.hadoop.hbase.client.Connection;
|
||||||
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
import org.apache.hadoop.hbase.client.ConnectionFactory;
|
||||||
|
@ -65,7 +63,6 @@ import org.junit.experimental.categories.Category;
|
||||||
import org.junit.rules.TestName;
|
import org.junit.rules.TestName;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
import org.apache.hbase.thirdparty.com.google.common.collect.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -488,7 +485,7 @@ public class TestMetaTableAccessor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean dispatch(CallRunner task) throws IOException, InterruptedException {
|
public boolean dispatch(CallRunner task) {
|
||||||
int priority = task.getRpcCall().getPriority();
|
int priority = task.getRpcCall().getPriority();
|
||||||
|
|
||||||
if (priority > HConstants.QOS_THRESHOLD) {
|
if (priority > HConstants.QOS_THRESHOLD) {
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -20,7 +20,6 @@ package org.apache.hadoop.hbase.client;
|
||||||
import static org.junit.Assert.assertArrayEquals;
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
@ -53,7 +52,6 @@ import org.junit.BeforeClass;
|
||||||
import org.junit.ClassRule;
|
import org.junit.ClassRule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.junit.experimental.categories.Category;
|
import org.junit.experimental.categories.Category;
|
||||||
|
|
||||||
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
import org.apache.hbase.thirdparty.com.google.common.io.Closeables;
|
||||||
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
|
import org.apache.hbase.thirdparty.com.google.protobuf.Descriptors.MethodDescriptor;
|
||||||
|
|
||||||
|
@ -90,7 +88,7 @@ public class TestAsyncClientPauseForCallQueueTooBig {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean dispatch(CallRunner callTask) throws InterruptedException {
|
public boolean dispatch(CallRunner callTask) {
|
||||||
if (FAIL) {
|
if (FAIL) {
|
||||||
MethodDescriptor method = callTask.getRpcCall().getMethod();
|
MethodDescriptor method = callTask.getRpcCall().getMethod();
|
||||||
// this is for test scan, where we will send a open scanner first and then a next, and we
|
// this is for test scan, where we will send a open scanner first and then a next, and we
|
||||||
|
|
|
@ -1,4 +1,4 @@
|
||||||
/**
|
/*
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
|
@ -18,8 +18,6 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.ipc;
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
|
|
||||||
public class DelegatingRpcScheduler extends RpcScheduler {
|
public class DelegatingRpcScheduler extends RpcScheduler {
|
||||||
protected RpcScheduler delegate;
|
protected RpcScheduler delegate;
|
||||||
|
|
||||||
|
@ -75,7 +73,7 @@ public class DelegatingRpcScheduler extends RpcScheduler {
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public boolean dispatch(CallRunner task) throws IOException, InterruptedException {
|
public boolean dispatch(CallRunner task) {
|
||||||
return delegate.dispatch(task);
|
return delegate.dispatch(task);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue