diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java index 1922f901b3c..724d828f416 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/FastPathBalancedQueueRpcExecutor.java @@ -65,6 +65,11 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor { @Override public boolean dispatch(CallRunner callTask) throws InterruptedException { + //FastPathHandlers don't check queue limits, so if we're completely shut down + //we have to prevent ourselves from using the handler in the first place + if (currentQueueLimit == 0){ + return false; + } FastPathHandler handler = popReadyHandler(); return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask); } diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 940f1c22762..d18e16784ea 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -39,10 +39,12 @@ import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.Abortable; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.client.Put; @@ -59,8 +61,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.Threads; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.experimental.categories.Category; +import org.junit.rules.TestName; +import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -74,6 +79,9 @@ import com.google.protobuf.Message; public class TestSimpleRpcScheduler { private static final Log LOG = LogFactory.getLog(TestSimpleRpcScheduler.class); + @Rule + public TestName testName = new TestName(); + private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() { @Override public InetSocketAddress getListenerAddress() { @@ -475,6 +483,25 @@ public class TestSimpleRpcScheduler { } } + @Test + public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Exception { + String name = testName.getMethodName(); + int handlerCount = 1; + String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE; + int maxQueueLength = 0; + PriorityFunction priority = mock(PriorityFunction.class); + Configuration conf = HBaseConfiguration.create(); + Abortable abortable = mock(Abortable.class); + FastPathBalancedQueueRpcExecutor executor = + Mockito.spy(new FastPathBalancedQueueRpcExecutor(name, + handlerCount, callQueueType, maxQueueLength, priority, conf, abortable)); + CallRunner task = mock(CallRunner.class); + assertFalse(executor.dispatch(task)); + //make sure we never internally get a handler, which would skip the queue validation + Mockito.verify(executor, Mockito.never()).getHandler(Mockito.anyString(), Mockito.anyDouble(), + (BlockingQueue) Mockito.any(), (AtomicInteger) Mockito.any()); + } + // Get mocked call that has the CallRunner sleep for a while so that the fast // path isn't hit. private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {