HBASE-15306 Make RPC call queue length dynamically configurable
This commit is contained in:
parent
d747188f2c
commit
bcbe174a27
|
@ -66,6 +66,10 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
|
||||||
|
|
||||||
protected void initializeQueues(final int numQueues,
|
protected void initializeQueues(final int numQueues,
|
||||||
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
|
final Class<? extends BlockingQueue> queueClass, Object... initargs) {
|
||||||
|
if (initargs.length > 0) {
|
||||||
|
currentQueueLimit = (int) initargs[0];
|
||||||
|
initargs[0] = Math.max((int) initargs[0], DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
|
||||||
|
}
|
||||||
for (int i = 0; i < numQueues; ++i) {
|
for (int i = 0; i < numQueues; ++i) {
|
||||||
queues.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(queueClass, initargs));
|
queues.add((BlockingQueue<CallRunner>) ReflectionUtils.newInstance(queueClass, initargs));
|
||||||
}
|
}
|
||||||
|
@ -74,7 +78,12 @@ public class BalancedQueueRpcExecutor extends RpcExecutor {
|
||||||
@Override
|
@Override
|
||||||
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
|
public boolean dispatch(final CallRunner callTask) throws InterruptedException {
|
||||||
int queueIndex = balancer.getNextQueue();
|
int queueIndex = balancer.getNextQueue();
|
||||||
return queues.get(queueIndex).offer(callTask);
|
BlockingQueue<CallRunner> queue = queues.get(queueIndex);
|
||||||
|
// that means we can overflow by at most <num reader> size (5), that's ok
|
||||||
|
if (queue.size() >= currentQueueLimit) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return queue.offer(callTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
|
|
@ -139,12 +139,22 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
||||||
" readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount +
|
" readQueues=" + numReadQueues + " readHandlers=" + readHandlersCount +
|
||||||
((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues +
|
((numScanQueues == 0) ? "" : " scanQueues=" + numScanQueues +
|
||||||
" scanHandlers=" + scanHandlersCount));
|
" scanHandlers=" + scanHandlersCount));
|
||||||
|
if (writeQueueInitArgs.length > 0) {
|
||||||
|
currentQueueLimit = (int) writeQueueInitArgs[0];
|
||||||
|
writeQueueInitArgs[0] = Math.max((int) writeQueueInitArgs[0],
|
||||||
|
DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
|
||||||
|
}
|
||||||
for (int i = 0; i < numWriteQueues; ++i) {
|
for (int i = 0; i < numWriteQueues; ++i) {
|
||||||
|
|
||||||
queues.add((BlockingQueue<CallRunner>)
|
queues.add((BlockingQueue<CallRunner>)
|
||||||
ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
|
ReflectionUtils.newInstance(writeQueueClass, writeQueueInitArgs));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (readQueueInitArgs.length > 0) {
|
||||||
|
currentQueueLimit = (int) readQueueInitArgs[0];
|
||||||
|
readQueueInitArgs[0] = Math.max((int) readQueueInitArgs[0],
|
||||||
|
DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT);
|
||||||
|
}
|
||||||
for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
|
for (int i = 0; i < (numReadQueues + numScanQueues); ++i) {
|
||||||
queues.add((BlockingQueue<CallRunner>)
|
queues.add((BlockingQueue<CallRunner>)
|
||||||
ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
|
ReflectionUtils.newInstance(readQueueClass, readQueueInitArgs));
|
||||||
|
@ -170,7 +180,12 @@ public class RWQueueRpcExecutor extends RpcExecutor {
|
||||||
} else {
|
} else {
|
||||||
queueIndex = numWriteQueues + readBalancer.getNextQueue();
|
queueIndex = numWriteQueues + readBalancer.getNextQueue();
|
||||||
}
|
}
|
||||||
return queues.get(queueIndex).offer(callTask);
|
|
||||||
|
BlockingQueue<CallRunner> queue = queues.get(queueIndex);
|
||||||
|
if (queue.size() >= currentQueueLimit) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return queue.offer(callTask);
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean isWriteRequest(final RequestHeader header, final Message param) {
|
private boolean isWriteRequest(final RequestHeader header, final Message param) {
|
||||||
|
|
|
@ -42,6 +42,9 @@ import com.google.common.base.Strings;
|
||||||
public abstract class RpcExecutor {
|
public abstract class RpcExecutor {
|
||||||
private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
|
private static final Log LOG = LogFactory.getLog(RpcExecutor.class);
|
||||||
|
|
||||||
|
protected static final int DEFAULT_CALL_QUEUE_SIZE_HARD_LIMIT = 250;
|
||||||
|
protected volatile int currentQueueLimit;
|
||||||
|
|
||||||
private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
|
private final AtomicInteger activeHandlerCount = new AtomicInteger(0);
|
||||||
private final List<Thread> handlers;
|
private final List<Thread> handlers;
|
||||||
private final int handlerCount;
|
private final int handlerCount;
|
||||||
|
@ -210,4 +213,12 @@ public abstract class RpcExecutor {
|
||||||
return ThreadLocalRandom.current().nextInt(queueSize);
|
return ThreadLocalRandom.current().nextInt(queueSize);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update current soft limit for executor's call queues
|
||||||
|
* @param conf updated configuration
|
||||||
|
*/
|
||||||
|
public void resizeQueues(Configuration conf) {
|
||||||
|
currentQueueLimit = conf.getInt("hbase.ipc.server.max.callqueue.length", currentQueueLimit);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -2090,6 +2090,9 @@ public class RpcServer implements RpcServerInterface, ConfigurationObserver {
|
||||||
@Override
|
@Override
|
||||||
public void onConfigurationChange(Configuration newConf) {
|
public void onConfigurationChange(Configuration newConf) {
|
||||||
initReconfigurable(newConf);
|
initReconfigurable(newConf);
|
||||||
|
if (scheduler instanceof ConfigurationObserver) {
|
||||||
|
((ConfigurationObserver)scheduler).onConfigurationChange(newConf);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void initReconfigurable(Configuration confToLoad) {
|
private void initReconfigurable(Configuration confToLoad) {
|
||||||
|
|
|
@ -28,6 +28,7 @@ import org.apache.hadoop.hbase.HBaseInterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
import org.apache.hadoop.hbase.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.hbase.conf.ConfigurationObserver;
|
||||||
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
|
import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -36,7 +37,7 @@ import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
@InterfaceAudience.LimitedPrivate({HBaseInterfaceAudience.COPROC, HBaseInterfaceAudience.PHOENIX})
|
||||||
@InterfaceStability.Evolving
|
@InterfaceStability.Evolving
|
||||||
public class SimpleRpcScheduler extends RpcScheduler {
|
public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObserver {
|
||||||
private static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
|
private static final Log LOG = LogFactory.getLog(SimpleRpcScheduler.class);
|
||||||
|
|
||||||
public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
|
public static final String CALL_QUEUE_READ_SHARE_CONF_KEY =
|
||||||
|
@ -55,6 +56,21 @@ public class SimpleRpcScheduler extends RpcScheduler {
|
||||||
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY
|
public static final String QUEUE_MAX_CALL_DELAY_CONF_KEY
|
||||||
= "hbase.ipc.server.queue.max.call.delay";
|
= "hbase.ipc.server.queue.max.call.delay";
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Resize call queues;
|
||||||
|
* @param conf new configuration
|
||||||
|
*/
|
||||||
|
@Override
|
||||||
|
public void onConfigurationChange(Configuration conf) {
|
||||||
|
callExecutor.resizeQueues(conf);
|
||||||
|
if (priorityExecutor != null) {
|
||||||
|
priorityExecutor.resizeQueues(conf);
|
||||||
|
}
|
||||||
|
if (replicationExecutor != null) {
|
||||||
|
replicationExecutor.resizeQueues(conf);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true.
|
* Comparator used by the "normal callQueue" if DEADLINE_CALL_QUEUE_CONF_KEY is set to true.
|
||||||
* It uses the calculated "deadline" e.g. to deprioritize long-running job
|
* It uses the calculated "deadline" e.g. to deprioritize long-running job
|
||||||
|
|
|
@ -52,7 +52,9 @@ import java.util.Map;
|
||||||
import java.util.concurrent.CountDownLatch;
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotEquals;
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.mockito.Matchers.any;
|
import static org.mockito.Matchers.any;
|
||||||
import static org.mockito.Matchers.anyObject;
|
import static org.mockito.Matchers.anyObject;
|
||||||
import static org.mockito.Matchers.eq;
|
import static org.mockito.Matchers.eq;
|
||||||
|
@ -317,4 +319,41 @@ public class TestSimpleRpcScheduler {
|
||||||
}
|
}
|
||||||
}).when(callTask).run();
|
}).when(callTask).run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testSoftAndHardQueueLimits() throws Exception {
|
||||||
|
Configuration schedConf = HBaseConfiguration.create();
|
||||||
|
|
||||||
|
schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 0);
|
||||||
|
schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
|
||||||
|
|
||||||
|
PriorityFunction priority = mock(PriorityFunction.class);
|
||||||
|
when(priority.getPriority(any(RequestHeader.class), any(Message.class),
|
||||||
|
any(User.class))).thenReturn(HConstants.NORMAL_QOS);
|
||||||
|
SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, priority,
|
||||||
|
HConstants.QOS_THRESHOLD);
|
||||||
|
try {
|
||||||
|
scheduler.start();
|
||||||
|
|
||||||
|
CallRunner putCallTask = mock(CallRunner.class);
|
||||||
|
RpcServer.Call putCall = mock(RpcServer.Call.class);
|
||||||
|
putCall.param = RequestConverter.buildMutateRequest(
|
||||||
|
Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
|
||||||
|
RequestHeader putHead = RequestHeader.newBuilder().setMethodName("mutate").build();
|
||||||
|
when(putCallTask.getCall()).thenReturn(putCall);
|
||||||
|
when(putCall.getHeader()).thenReturn(putHead);
|
||||||
|
|
||||||
|
assertTrue(scheduler.dispatch(putCallTask));
|
||||||
|
|
||||||
|
schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
|
||||||
|
scheduler.onConfigurationChange(schedConf);
|
||||||
|
assertFalse(scheduler.dispatch(putCallTask));
|
||||||
|
|
||||||
|
schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
|
||||||
|
scheduler.onConfigurationChange(schedConf);
|
||||||
|
assertTrue(scheduler.dispatch(putCallTask));
|
||||||
|
} finally {
|
||||||
|
scheduler.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue