HBASE-23744 - FastPathBalancedQueueRpcExecutor should enforce queue length of 0

Closes #1094

Co-authored-by: Viraj Jasani <vjasani@apache.org>

Signed-off-by: Xu Cang <xucang@apache.org>
Signed-off-by: Nick Dimiduk <ndimiduk@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
This commit is contained in:
Geoffrey Jacoby 2020-07-13 12:08:59 +05:30 committed by Viraj Jasani
parent 15c20be6ff
commit be151e3fce
No known key found for this signature in database
GPG Key ID: B3D6C0B41C8ADFD5
2 changed files with 32 additions and 0 deletions

View File

@ -65,6 +65,11 @@ public class FastPathBalancedQueueRpcExecutor extends BalancedQueueRpcExecutor {
@Override @Override
public boolean dispatch(CallRunner callTask) throws InterruptedException { public boolean dispatch(CallRunner callTask) throws InterruptedException {
//FastPathHandlers don't check queue limits, so if we're completely shut down
//we have to prevent ourselves from using the handler in the first place
if (currentQueueLimit == 0){
return false;
}
FastPathHandler handler = popReadyHandler(); FastPathHandler handler = popReadyHandler();
return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask); return handler != null? handler.loadCallRunner(callTask): super.dispatch(callTask);
} }

View File

@ -39,10 +39,12 @@ import java.util.Set;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Abortable;
import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Put;
@ -59,8 +61,11 @@ import org.apache.hadoop.hbase.util.EnvironmentEdge;
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
import org.apache.hadoop.hbase.util.Threads; import org.apache.hadoop.hbase.util.Threads;
import org.junit.Before; import org.junit.Before;
import org.junit.Rule;
import org.junit.Test; import org.junit.Test;
import org.junit.experimental.categories.Category; import org.junit.experimental.categories.Category;
import org.junit.rules.TestName;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock; import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer; import org.mockito.stubbing.Answer;
@ -74,6 +79,9 @@ import com.google.protobuf.Message;
public class TestSimpleRpcScheduler { public class TestSimpleRpcScheduler {
private static final Log LOG = LogFactory.getLog(TestSimpleRpcScheduler.class); private static final Log LOG = LogFactory.getLog(TestSimpleRpcScheduler.class);
@Rule
public TestName testName = new TestName();
private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() { private final RpcScheduler.Context CONTEXT = new RpcScheduler.Context() {
@Override @Override
public InetSocketAddress getListenerAddress() { public InetSocketAddress getListenerAddress() {
@ -475,6 +483,25 @@ public class TestSimpleRpcScheduler {
} }
} }
@Test
public void testFastPathBalancedQueueRpcExecutorWithQueueLength0() throws Exception {
String name = testName.getMethodName();
int handlerCount = 1;
String callQueueType = RpcExecutor.CALL_QUEUE_TYPE_CODEL_CONF_VALUE;
int maxQueueLength = 0;
PriorityFunction priority = mock(PriorityFunction.class);
Configuration conf = HBaseConfiguration.create();
Abortable abortable = mock(Abortable.class);
FastPathBalancedQueueRpcExecutor executor =
Mockito.spy(new FastPathBalancedQueueRpcExecutor(name,
handlerCount, callQueueType, maxQueueLength, priority, conf, abortable));
CallRunner task = mock(CallRunner.class);
assertFalse(executor.dispatch(task));
//make sure we never internally get a handler, which would skip the queue validation
Mockito.verify(executor, Mockito.never()).getHandler(Mockito.anyString(), Mockito.anyDouble(),
(BlockingQueue<CallRunner>) Mockito.any(), (AtomicInteger) Mockito.any());
}
// Get mocked call that has the CallRunner sleep for a while so that the fast // Get mocked call that has the CallRunner sleep for a while so that the fast
// path isn't hit. // path isn't hit.
private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException { private CallRunner getMockedCallRunner(long timestamp, final long sleepTime) throws IOException {