HBASE-15360 addendum fix testCoDelScheduling
Signed-off-by: stack <stack@apache.org>
This commit is contained in:
parent
000117ad9f
commit
2348478506
@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Adaptive LIFO blocking queue utilizing CoDel algorithm to prevent queue overloading.
|
* Adaptive LIFO blocking queue utilizing CoDel algorithm to prevent queue overloading.
|
||||||
@ -77,7 +78,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
|
|||||||
private volatile long minDelay;
|
private volatile long minDelay;
|
||||||
|
|
||||||
// the moment when current interval ends
|
// the moment when current interval ends
|
||||||
private volatile long intervalTime = System.currentTimeMillis();
|
private volatile long intervalTime = EnvironmentEdgeManager.currentTime();
|
||||||
|
|
||||||
// switch to ensure only one threads does interval cutoffs
|
// switch to ensure only one threads does interval cutoffs
|
||||||
private AtomicBoolean resetDelay = new AtomicBoolean(true);
|
private AtomicBoolean resetDelay = new AtomicBoolean(true);
|
||||||
@ -147,7 +148,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
|
|||||||
* and internal queue state (deemed overloaded).
|
* and internal queue state (deemed overloaded).
|
||||||
*/
|
*/
|
||||||
private boolean needToDrop(CallRunner callRunner) {
|
private boolean needToDrop(CallRunner callRunner) {
|
||||||
long now = System.currentTimeMillis();
|
long now = EnvironmentEdgeManager.currentTime();
|
||||||
long callDelay = now - callRunner.getCall().timestamp;
|
long callDelay = now - callRunner.getCall().timestamp;
|
||||||
|
|
||||||
long localMinDelay = this.minDelay;
|
long localMinDelay = this.minDelay;
|
||||||
|
@ -203,7 +203,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
|||||||
} else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
|
} else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
|
||||||
Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval,
|
Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval,
|
||||||
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
|
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
|
||||||
callExecutor = new RWQueueRpcExecutor("B.default", handlerCount,
|
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount,
|
||||||
numCallQueues, callqReadShare, callqScanShare,
|
numCallQueues, callqReadShare, callqScanShare,
|
||||||
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
|
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
|
||||||
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
|
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
|
||||||
|
@ -17,44 +17,6 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hbase.ipc;
|
package org.apache.hadoop.hbase.ipc;
|
||||||
|
|
||||||
import com.google.common.collect.ImmutableList;
|
|
||||||
import com.google.common.collect.ImmutableMap;
|
|
||||||
import com.google.common.collect.ImmutableSet;
|
|
||||||
import com.google.common.collect.Maps;
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import com.google.protobuf.Message;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
|
||||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
|
||||||
import org.apache.hadoop.hbase.HConstants;
|
|
||||||
import org.apache.hadoop.hbase.client.Put;
|
|
||||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
|
|
||||||
import org.apache.hadoop.hbase.security.User;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
|
||||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
|
||||||
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
|
|
||||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
|
||||||
import org.apache.hadoop.hbase.util.Bytes;
|
|
||||||
import org.apache.hadoop.hbase.util.Threads;
|
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Rule;
|
|
||||||
import org.junit.Test;
|
|
||||||
import org.junit.experimental.categories.Category;
|
|
||||||
import org.junit.rules.TestRule;
|
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
|
||||||
import org.mockito.stubbing.Answer;
|
|
||||||
|
|
||||||
import java.io.IOException;
|
|
||||||
import java.net.InetSocketAddress;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
|
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertFalse;
|
import static org.junit.Assert.assertFalse;
|
||||||
import static org.junit.Assert.assertNotEquals;
|
import static org.junit.Assert.assertNotEquals;
|
||||||
@ -68,6 +30,51 @@ import static org.mockito.Mockito.timeout;
|
|||||||
import static org.mockito.Mockito.verify;
|
import static org.mockito.Mockito.verify;
|
||||||
import static org.mockito.Mockito.when;
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.net.InetSocketAddress;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.HashSet;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
|
|
||||||
|
import org.apache.commons.logging.Log;
|
||||||
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.hbase.CategoryBasedTimeout;
|
||||||
|
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||||
|
import org.apache.hadoop.hbase.HConstants;
|
||||||
|
import org.apache.hadoop.hbase.client.Put;
|
||||||
|
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
|
||||||
|
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
|
||||||
|
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
|
||||||
|
import org.apache.hadoop.hbase.security.User;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
||||||
|
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||||
|
import org.apache.hadoop.hbase.util.Bytes;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||||
|
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||||
|
import org.apache.hadoop.hbase.util.Threads;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.experimental.categories.Category;
|
||||||
|
import org.junit.rules.TestRule;
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableList;
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import com.google.common.collect.Maps;
|
||||||
|
import com.google.protobuf.Message;
|
||||||
|
|
||||||
@Category({RPCTests.class, SmallTests.class})
|
@Category({RPCTests.class, SmallTests.class})
|
||||||
public class TestSimpleRpcScheduler {
|
public class TestSimpleRpcScheduler {
|
||||||
@Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
|
@Rule public final TestRule timeout = CategoryBasedTimeout.builder().withTimeout(this.getClass()).
|
||||||
@ -218,7 +225,7 @@ public class TestSimpleRpcScheduler {
|
|||||||
scheduler.dispatch(smallCallTask);
|
scheduler.dispatch(smallCallTask);
|
||||||
|
|
||||||
while (work.size() < 8) {
|
while (work.size() < 8) {
|
||||||
Threads.sleepWithoutInterrupt(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
int seqSum = 0;
|
int seqSum = 0;
|
||||||
@ -298,7 +305,7 @@ public class TestSimpleRpcScheduler {
|
|||||||
scheduler.dispatch(scanCallTask);
|
scheduler.dispatch(scanCallTask);
|
||||||
|
|
||||||
while (work.size() < 6) {
|
while (work.size() < 6) {
|
||||||
Threads.sleepWithoutInterrupt(100);
|
Thread.sleep(100);
|
||||||
}
|
}
|
||||||
|
|
||||||
for (int i = 0; i < work.size() - 2; i += 3) {
|
for (int i = 0; i < work.size() - 2; i += 3) {
|
||||||
@ -326,6 +333,13 @@ public class TestSimpleRpcScheduler {
|
|||||||
}).when(callTask).run();
|
}).when(callTask).run();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler)
|
||||||
|
throws InterruptedException {
|
||||||
|
while (scheduler.getGeneralQueueLength() > 0) {
|
||||||
|
Thread.sleep(100);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSoftAndHardQueueLimits() throws Exception {
|
public void testSoftAndHardQueueLimits() throws Exception {
|
||||||
Configuration schedConf = HBaseConfiguration.create();
|
Configuration schedConf = HBaseConfiguration.create();
|
||||||
@ -354,9 +368,7 @@ public class TestSimpleRpcScheduler {
|
|||||||
schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
|
schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
|
||||||
scheduler.onConfigurationChange(schedConf);
|
scheduler.onConfigurationChange(schedConf);
|
||||||
assertFalse(scheduler.dispatch(putCallTask));
|
assertFalse(scheduler.dispatch(putCallTask));
|
||||||
while (scheduler.getGeneralQueueLength() > 0) {
|
waitUntilQueueEmpty(scheduler);
|
||||||
Threads.sleepWithoutInterrupt(100);
|
|
||||||
}
|
|
||||||
schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
|
schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
|
||||||
scheduler.onConfigurationChange(schedConf);
|
scheduler.onConfigurationChange(schedConf);
|
||||||
assertTrue(scheduler.dispatch(putCallTask));
|
assertTrue(scheduler.dispatch(putCallTask));
|
||||||
@ -365,8 +377,30 @@ public class TestSimpleRpcScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static final class CoDelEnvironmentEdge implements EnvironmentEdge {
|
||||||
|
|
||||||
|
private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>();
|
||||||
|
|
||||||
|
private long offset;
|
||||||
|
|
||||||
|
private final Set<String> threadNamePrefixs = new HashSet<>();
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long currentTime() {
|
||||||
|
for (String threadNamePrefix : threadNamePrefixs) {
|
||||||
|
if (Thread.currentThread().getName().startsWith(threadNamePrefix)) {
|
||||||
|
return timeQ.poll().longValue() + offset;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return System.currentTimeMillis();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCoDelScheduling() throws Exception {
|
public void testCoDelScheduling() throws Exception {
|
||||||
|
CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
|
||||||
|
envEdge.threadNamePrefixs.add("RW.default");
|
||||||
|
envEdge.threadNamePrefixs.add("B.default");
|
||||||
Configuration schedConf = HBaseConfiguration.create();
|
Configuration schedConf = HBaseConfiguration.create();
|
||||||
|
|
||||||
schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY,
|
schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY,
|
||||||
@ -379,36 +413,51 @@ public class TestSimpleRpcScheduler {
|
|||||||
HConstants.QOS_THRESHOLD);
|
HConstants.QOS_THRESHOLD);
|
||||||
try {
|
try {
|
||||||
scheduler.start();
|
scheduler.start();
|
||||||
|
EnvironmentEdgeManager.injectEdge(envEdge);
|
||||||
|
envEdge.offset = 5;
|
||||||
// calls faster than min delay
|
// calls faster than min delay
|
||||||
for (int i = 0; i < 100; i++) {
|
for (int i = 0; i < 100; i++) {
|
||||||
CallRunner cr = getMockedCallRunner();
|
long time = System.currentTimeMillis();
|
||||||
|
envEdge.timeQ.put(time);
|
||||||
|
CallRunner cr = getMockedCallRunner(time);
|
||||||
Thread.sleep(5);
|
Thread.sleep(5);
|
||||||
scheduler.dispatch(cr);
|
scheduler.dispatch(cr);
|
||||||
}
|
}
|
||||||
Thread.sleep(100); // make sure fast calls are handled
|
// make sure fast calls are handled
|
||||||
|
waitUntilQueueEmpty(scheduler);
|
||||||
|
Thread.sleep(100);
|
||||||
assertEquals("None of these calls should have been discarded", 0,
|
assertEquals("None of these calls should have been discarded", 0,
|
||||||
scheduler.getNumGeneralCallsDropped());
|
scheduler.getNumGeneralCallsDropped());
|
||||||
|
|
||||||
|
envEdge.offset = 6;
|
||||||
// calls slower than min delay, but not individually slow enough to be dropped
|
// calls slower than min delay, but not individually slow enough to be dropped
|
||||||
for (int i = 0; i < 20; i++) {
|
for (int i = 0; i < 20; i++) {
|
||||||
CallRunner cr = getMockedCallRunner();
|
long time = System.currentTimeMillis();
|
||||||
|
envEdge.timeQ.put(time);
|
||||||
|
CallRunner cr = getMockedCallRunner(time);
|
||||||
Thread.sleep(6);
|
Thread.sleep(6);
|
||||||
scheduler.dispatch(cr);
|
scheduler.dispatch(cr);
|
||||||
}
|
}
|
||||||
|
|
||||||
Thread.sleep(100); // make sure somewhat slow calls are handled
|
// make sure somewhat slow calls are handled
|
||||||
|
waitUntilQueueEmpty(scheduler);
|
||||||
|
Thread.sleep(100);
|
||||||
assertEquals("None of these calls should have been discarded", 0,
|
assertEquals("None of these calls should have been discarded", 0,
|
||||||
scheduler.getNumGeneralCallsDropped());
|
scheduler.getNumGeneralCallsDropped());
|
||||||
|
|
||||||
|
envEdge.offset = 12;
|
||||||
// now slow calls and the ones to be dropped
|
// now slow calls and the ones to be dropped
|
||||||
for (int i = 0; i < 20; i++) {
|
for (int i = 0; i < 20; i++) {
|
||||||
CallRunner cr = getMockedCallRunner();
|
long time = System.currentTimeMillis();
|
||||||
|
envEdge.timeQ.put(time);
|
||||||
|
CallRunner cr = getMockedCallRunner(time);
|
||||||
Thread.sleep(12);
|
Thread.sleep(12);
|
||||||
scheduler.dispatch(cr);
|
scheduler.dispatch(cr);
|
||||||
}
|
}
|
||||||
|
|
||||||
Thread.sleep(100); // make sure somewhat slow calls are handled
|
// make sure somewhat slow calls are handled
|
||||||
|
waitUntilQueueEmpty(scheduler);
|
||||||
|
Thread.sleep(100);
|
||||||
assertTrue("There should have been at least 12 calls dropped",
|
assertTrue("There should have been at least 12 calls dropped",
|
||||||
scheduler.getNumGeneralCallsDropped() > 12);
|
scheduler.getNumGeneralCallsDropped() > 12);
|
||||||
} finally {
|
} finally {
|
||||||
@ -416,7 +465,7 @@ public class TestSimpleRpcScheduler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private CallRunner getMockedCallRunner() throws IOException {
|
private CallRunner getMockedCallRunner(long timestamp) throws IOException {
|
||||||
CallRunner putCallTask = mock(CallRunner.class);
|
CallRunner putCallTask = mock(CallRunner.class);
|
||||||
RpcServer.Call putCall = mock(RpcServer.Call.class);
|
RpcServer.Call putCall = mock(RpcServer.Call.class);
|
||||||
putCall.param = RequestConverter.buildMutateRequest(
|
putCall.param = RequestConverter.buildMutateRequest(
|
||||||
@ -424,7 +473,7 @@ public class TestSimpleRpcScheduler {
|
|||||||
RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build();
|
RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build();
|
||||||
when(putCallTask.getCall()).thenReturn(putCall);
|
when(putCallTask.getCall()).thenReturn(putCall);
|
||||||
when(putCall.getHeader()).thenReturn(putHead);
|
when(putCall.getHeader()).thenReturn(putHead);
|
||||||
putCall.timestamp = System.currentTimeMillis();
|
putCall.timestamp = timestamp;
|
||||||
return putCallTask;
|
return putCallTask;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user