diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java index 37e86bec4d8..266c6a27480 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/AdaptiveLifoCoDelCallQueue.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.hbase.classification.InterfaceAudience; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; /** * Adaptive LIFO blocking queue utilizing CoDel algorithm to prevent queue overloading. @@ -77,7 +78,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue { private volatile long minDelay; // the moment when current interval ends - private volatile long intervalTime = System.currentTimeMillis(); + private volatile long intervalTime = EnvironmentEdgeManager.currentTime(); // switch to ensure only one threads does interval cutoffs private AtomicBoolean resetDelay = new AtomicBoolean(true); @@ -147,7 +148,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue { * and internal queue state (deemed overloaded). */ private boolean needToDrop(CallRunner callRunner) { - long now = System.currentTimeMillis(); + long now = EnvironmentEdgeManager.currentTime(); long callDelay = now - callRunner.getCall().timestamp; long localMinDelay = this.minDelay; diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java index 0cd34bb1e21..431aeebf766 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java @@ -203,7 +203,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs } else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) { Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval, codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches}; - callExecutor = new RWQueueRpcExecutor("B.default", handlerCount, + callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount, numCallQueues, callqReadShare, callqScanShare, AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs, AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java index 1229ea22ee6..aae12036cdc 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java @@ -16,41 +16,6 @@ * limitations under the License. */ package org.apache.hadoop.hbase.ipc; - -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; -import com.google.common.collect.ImmutableSet; -import com.google.common.collect.Maps; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import com.google.protobuf.Message; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseConfiguration; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; -import org.apache.hadoop.hbase.security.User; -import org.apache.hadoop.hbase.testclassification.SmallTests; -import org.apache.hadoop.hbase.ipc.RpcServer.Call; -import org.apache.hadoop.hbase.protobuf.RequestConverter; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; -import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; -import org.apache.hadoop.hbase.util.Bytes; -import org.apache.hadoop.hbase.util.Threads; -import org.junit.Before; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.mockito.invocation.InvocationOnMock; -import org.mockito.stubbing.Answer; - -import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotEquals; @@ -64,6 +29,55 @@ import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.io.IOException; +import java.net.InetSocketAddress; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.LinkedBlockingQueue; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.ipc.RpcServer.Call; +import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl; +import org.apache.hadoop.hbase.protobuf.RequestConverter; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos; +import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader; +import org.apache.hadoop.hbase.security.User; +import org.apache.hadoop.hbase.testclassification.RPCTests; +import org.apache.hadoop.hbase.testclassification.SmallTests; +import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.EnvironmentEdge; +import org.apache.hadoop.hbase.util.EnvironmentEdgeManager; +import org.apache.hadoop.hbase.util.Threads; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.common.collect.Maps; +import com.google.protobuf.Message; + @Category(SmallTests.class) public class TestSimpleRpcScheduler { private static final Log LOG = LogFactory.getLog(TestSimpleRpcScheduler.class); @@ -212,7 +226,7 @@ public class TestSimpleRpcScheduler { scheduler.dispatch(smallCallTask); while (work.size() < 8) { - Threads.sleepWithoutInterrupt(100); + Thread.sleep(100); } int seqSum = 0; @@ -292,7 +306,7 @@ public class TestSimpleRpcScheduler { scheduler.dispatch(scanCallTask); while (work.size() < 6) { - Threads.sleepWithoutInterrupt(100); + Thread.sleep(100); } for (int i = 0; i < work.size() - 2; i += 3) { @@ -320,6 +334,13 @@ public class TestSimpleRpcScheduler { }).when(callTask).run(); } + private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler) + throws InterruptedException { + while (scheduler.getGeneralQueueLength() > 0) { + Thread.sleep(100); + } + } + @Test public void testSoftAndHardQueueLimits() throws Exception { Configuration schedConf = HBaseConfiguration.create(); @@ -348,9 +369,7 @@ public class TestSimpleRpcScheduler { schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0); scheduler.onConfigurationChange(schedConf); assertFalse(scheduler.dispatch(putCallTask)); - while (scheduler.getGeneralQueueLength() > 0) { - Threads.sleepWithoutInterrupt(100); - } + waitUntilQueueEmpty(scheduler); schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1); scheduler.onConfigurationChange(schedConf); assertTrue(scheduler.dispatch(putCallTask)); @@ -359,8 +378,30 @@ public class TestSimpleRpcScheduler { } } + private static final class CoDelEnvironmentEdge implements EnvironmentEdge { + + private final BlockingQueue timeQ = new LinkedBlockingQueue<>(); + + private long offset; + + private final Set threadNamePrefixs = new HashSet<>(); + + @Override + public long currentTime() { + for (String threadNamePrefix : threadNamePrefixs) { + if (Thread.currentThread().getName().startsWith(threadNamePrefix)) { + return timeQ.poll().longValue() + offset; + } + } + return System.currentTimeMillis(); + } + } + @Test public void testCoDelScheduling() throws Exception { + CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge(); + envEdge.threadNamePrefixs.add("RW.default"); + envEdge.threadNamePrefixs.add("B.default"); Configuration schedConf = HBaseConfiguration.create(); schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY, @@ -373,36 +414,51 @@ public class TestSimpleRpcScheduler { HConstants.QOS_THRESHOLD); try { scheduler.start(); - + EnvironmentEdgeManager.injectEdge(envEdge); + envEdge.offset = 5; // calls faster than min delay for (int i = 0; i < 100; i++) { - CallRunner cr = getMockedCallRunner(); + long time = System.currentTimeMillis(); + envEdge.timeQ.put(time); + CallRunner cr = getMockedCallRunner(time); Thread.sleep(5); scheduler.dispatch(cr); } - Thread.sleep(100); // make sure fast calls are handled + // make sure fast calls are handled + waitUntilQueueEmpty(scheduler); + Thread.sleep(100); assertEquals("None of these calls should have been discarded", 0, scheduler.getNumGeneralCallsDropped()); + envEdge.offset = 6; // calls slower than min delay, but not individually slow enough to be dropped for (int i = 0; i < 20; i++) { - CallRunner cr = getMockedCallRunner(); + long time = System.currentTimeMillis(); + envEdge.timeQ.put(time); + CallRunner cr = getMockedCallRunner(time); Thread.sleep(6); scheduler.dispatch(cr); } - Thread.sleep(100); // make sure somewhat slow calls are handled + // make sure somewhat slow calls are handled + waitUntilQueueEmpty(scheduler); + Thread.sleep(100); assertEquals("None of these calls should have been discarded", 0, scheduler.getNumGeneralCallsDropped()); + envEdge.offset = 12; // now slow calls and the ones to be dropped for (int i = 0; i < 20; i++) { - CallRunner cr = getMockedCallRunner(); + long time = System.currentTimeMillis(); + envEdge.timeQ.put(time); + CallRunner cr = getMockedCallRunner(time); Thread.sleep(12); scheduler.dispatch(cr); } - Thread.sleep(100); // make sure somewhat slow calls are handled + // make sure somewhat slow calls are handled + waitUntilQueueEmpty(scheduler); + Thread.sleep(100); assertTrue("There should have been at least 12 calls dropped", scheduler.getNumGeneralCallsDropped() > 12); } finally { @@ -410,7 +466,7 @@ public class TestSimpleRpcScheduler { } } - private CallRunner getMockedCallRunner() throws IOException { + private CallRunner getMockedCallRunner(long timestamp) throws IOException { CallRunner putCallTask = mock(CallRunner.class); RpcServer.Call putCall = mock(RpcServer.Call.class); putCall.param = RequestConverter.buildMutateRequest( @@ -418,7 +474,7 @@ public class TestSimpleRpcScheduler { RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build(); when(putCallTask.getCall()).thenReturn(putCall); when(putCall.getHeader()).thenReturn(putHead); - putCall.timestamp = System.currentTimeMillis(); + putCall.timestamp = timestamp; return putCallTask; } }