HBASE-15360 Fix flaky TestSimpleRpcScheduler; ADDENDUM
This commit is contained in:
parent
47eb79311f
commit
a3d550fbca
|
@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
|
||||
/**
|
||||
* Adaptive LIFO blocking queue utilizing CoDel algorithm to prevent queue overloading.
|
||||
|
@ -77,7 +78,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
|
|||
private volatile long minDelay;
|
||||
|
||||
// the moment when current interval ends
|
||||
private volatile long intervalTime = System.currentTimeMillis();
|
||||
private volatile long intervalTime = EnvironmentEdgeManager.currentTime();
|
||||
|
||||
// switch to ensure only one threads does interval cutoffs
|
||||
private AtomicBoolean resetDelay = new AtomicBoolean(true);
|
||||
|
@ -147,7 +148,7 @@ public class AdaptiveLifoCoDelCallQueue implements BlockingQueue<CallRunner> {
|
|||
* and internal queue state (deemed overloaded).
|
||||
*/
|
||||
private boolean needToDrop(CallRunner callRunner) {
|
||||
long now = System.currentTimeMillis();
|
||||
long now = EnvironmentEdgeManager.currentTime();
|
||||
long callDelay = now - callRunner.getCall().timestamp;
|
||||
|
||||
long localMinDelay = this.minDelay;
|
||||
|
|
|
@ -203,7 +203,7 @@ public class SimpleRpcScheduler extends RpcScheduler implements ConfigurationObs
|
|||
} else if (callQueueType.equals(CALL_QUEUE_TYPE_CODEL_CONF_VALUE)) {
|
||||
Object[] callQueueInitArgs = {maxQueueLength, codelTargetDelay, codelInterval,
|
||||
codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches};
|
||||
callExecutor = new RWQueueRpcExecutor("B.default", handlerCount,
|
||||
callExecutor = new RWQueueRpcExecutor("RW.default", handlerCount,
|
||||
numCallQueues, callqReadShare, callqScanShare,
|
||||
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs,
|
||||
AdaptiveLifoCoDelCallQueue.class, callQueueInitArgs);
|
||||
|
|
|
@ -16,41 +16,6 @@
|
|||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.ipc;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import com.google.protobuf.Message;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
|
||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertNotEquals;
|
||||
|
@ -64,6 +29,55 @@ import static org.mockito.Mockito.timeout;
|
|||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.hbase.HBaseConfiguration;
|
||||
import org.apache.hadoop.hbase.HConstants;
|
||||
import org.apache.hadoop.hbase.client.Put;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.ipc.RpcServer.Call;
|
||||
import org.apache.hadoop.hbase.monitoring.MonitoredRPCHandlerImpl;
|
||||
import org.apache.hadoop.hbase.protobuf.RequestConverter;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.ClientProtos.ScanRequest;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos;
|
||||
import org.apache.hadoop.hbase.protobuf.generated.RPCProtos.RequestHeader;
|
||||
import org.apache.hadoop.hbase.security.User;
|
||||
import org.apache.hadoop.hbase.testclassification.RPCTests;
|
||||
import org.apache.hadoop.hbase.testclassification.SmallTests;
|
||||
import org.apache.hadoop.hbase.util.Bytes;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdge;
|
||||
import org.apache.hadoop.hbase.util.EnvironmentEdgeManager;
|
||||
import org.apache.hadoop.hbase.util.Threads;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.experimental.categories.Category;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import com.google.common.collect.ImmutableList;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.ImmutableSet;
|
||||
import com.google.common.collect.Maps;
|
||||
import com.google.protobuf.Message;
|
||||
|
||||
@Category(SmallTests.class)
|
||||
public class TestSimpleRpcScheduler {
|
||||
private static final Log LOG = LogFactory.getLog(TestSimpleRpcScheduler.class);
|
||||
|
@ -212,7 +226,7 @@ public class TestSimpleRpcScheduler {
|
|||
scheduler.dispatch(smallCallTask);
|
||||
|
||||
while (work.size() < 8) {
|
||||
Threads.sleepWithoutInterrupt(100);
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
int seqSum = 0;
|
||||
|
@ -292,7 +306,7 @@ public class TestSimpleRpcScheduler {
|
|||
scheduler.dispatch(scanCallTask);
|
||||
|
||||
while (work.size() < 6) {
|
||||
Threads.sleepWithoutInterrupt(100);
|
||||
Thread.sleep(100);
|
||||
}
|
||||
|
||||
for (int i = 0; i < work.size() - 2; i += 3) {
|
||||
|
@ -320,6 +334,13 @@ public class TestSimpleRpcScheduler {
|
|||
}).when(callTask).run();
|
||||
}
|
||||
|
||||
private static void waitUntilQueueEmpty(SimpleRpcScheduler scheduler)
|
||||
throws InterruptedException {
|
||||
while (scheduler.getGeneralQueueLength() > 0) {
|
||||
Thread.sleep(100);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSoftAndHardQueueLimits() throws Exception {
|
||||
Configuration schedConf = HBaseConfiguration.create();
|
||||
|
@ -348,9 +369,7 @@ public class TestSimpleRpcScheduler {
|
|||
schedConf.setInt("hbase.ipc.server.max.callqueue.length", 0);
|
||||
scheduler.onConfigurationChange(schedConf);
|
||||
assertFalse(scheduler.dispatch(putCallTask));
|
||||
while (scheduler.getGeneralQueueLength() > 0) {
|
||||
Threads.sleepWithoutInterrupt(100);
|
||||
}
|
||||
waitUntilQueueEmpty(scheduler);
|
||||
schedConf.setInt("hbase.ipc.server.max.callqueue.length", 1);
|
||||
scheduler.onConfigurationChange(schedConf);
|
||||
assertTrue(scheduler.dispatch(putCallTask));
|
||||
|
@ -359,8 +378,30 @@ public class TestSimpleRpcScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
private static final class CoDelEnvironmentEdge implements EnvironmentEdge {
|
||||
|
||||
private final BlockingQueue<Long> timeQ = new LinkedBlockingQueue<>();
|
||||
|
||||
private long offset;
|
||||
|
||||
private final Set<String> threadNamePrefixs = new HashSet<>();
|
||||
|
||||
@Override
|
||||
public long currentTime() {
|
||||
for (String threadNamePrefix : threadNamePrefixs) {
|
||||
if (Thread.currentThread().getName().startsWith(threadNamePrefix)) {
|
||||
return timeQ.poll().longValue() + offset;
|
||||
}
|
||||
}
|
||||
return System.currentTimeMillis();
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testCoDelScheduling() throws Exception {
|
||||
CoDelEnvironmentEdge envEdge = new CoDelEnvironmentEdge();
|
||||
envEdge.threadNamePrefixs.add("RW.default");
|
||||
envEdge.threadNamePrefixs.add("B.default");
|
||||
Configuration schedConf = HBaseConfiguration.create();
|
||||
|
||||
schedConf.set(SimpleRpcScheduler.CALL_QUEUE_TYPE_CONF_KEY,
|
||||
|
@ -373,36 +414,51 @@ public class TestSimpleRpcScheduler {
|
|||
HConstants.QOS_THRESHOLD);
|
||||
try {
|
||||
scheduler.start();
|
||||
|
||||
EnvironmentEdgeManager.injectEdge(envEdge);
|
||||
envEdge.offset = 5;
|
||||
// calls faster than min delay
|
||||
for (int i = 0; i < 100; i++) {
|
||||
CallRunner cr = getMockedCallRunner();
|
||||
long time = System.currentTimeMillis();
|
||||
envEdge.timeQ.put(time);
|
||||
CallRunner cr = getMockedCallRunner(time);
|
||||
Thread.sleep(5);
|
||||
scheduler.dispatch(cr);
|
||||
}
|
||||
Thread.sleep(100); // make sure fast calls are handled
|
||||
// make sure fast calls are handled
|
||||
waitUntilQueueEmpty(scheduler);
|
||||
Thread.sleep(100);
|
||||
assertEquals("None of these calls should have been discarded", 0,
|
||||
scheduler.getNumGeneralCallsDropped());
|
||||
|
||||
envEdge.offset = 6;
|
||||
// calls slower than min delay, but not individually slow enough to be dropped
|
||||
for (int i = 0; i < 20; i++) {
|
||||
CallRunner cr = getMockedCallRunner();
|
||||
long time = System.currentTimeMillis();
|
||||
envEdge.timeQ.put(time);
|
||||
CallRunner cr = getMockedCallRunner(time);
|
||||
Thread.sleep(6);
|
||||
scheduler.dispatch(cr);
|
||||
}
|
||||
|
||||
Thread.sleep(100); // make sure somewhat slow calls are handled
|
||||
// make sure somewhat slow calls are handled
|
||||
waitUntilQueueEmpty(scheduler);
|
||||
Thread.sleep(100);
|
||||
assertEquals("None of these calls should have been discarded", 0,
|
||||
scheduler.getNumGeneralCallsDropped());
|
||||
|
||||
envEdge.offset = 12;
|
||||
// now slow calls and the ones to be dropped
|
||||
for (int i = 0; i < 20; i++) {
|
||||
CallRunner cr = getMockedCallRunner();
|
||||
long time = System.currentTimeMillis();
|
||||
envEdge.timeQ.put(time);
|
||||
CallRunner cr = getMockedCallRunner(time);
|
||||
Thread.sleep(12);
|
||||
scheduler.dispatch(cr);
|
||||
}
|
||||
|
||||
Thread.sleep(100); // make sure somewhat slow calls are handled
|
||||
// make sure somewhat slow calls are handled
|
||||
waitUntilQueueEmpty(scheduler);
|
||||
Thread.sleep(100);
|
||||
assertTrue("There should have been at least 12 calls dropped",
|
||||
scheduler.getNumGeneralCallsDropped() > 12);
|
||||
} finally {
|
||||
|
@ -410,7 +466,7 @@ public class TestSimpleRpcScheduler {
|
|||
}
|
||||
}
|
||||
|
||||
private CallRunner getMockedCallRunner() throws IOException {
|
||||
private CallRunner getMockedCallRunner(long timestamp) throws IOException {
|
||||
CallRunner putCallTask = mock(CallRunner.class);
|
||||
RpcServer.Call putCall = mock(RpcServer.Call.class);
|
||||
putCall.param = RequestConverter.buildMutateRequest(
|
||||
|
@ -418,7 +474,7 @@ public class TestSimpleRpcScheduler {
|
|||
RPCProtos.RequestHeader putHead = RPCProtos.RequestHeader.newBuilder().setMethodName("mutate").build();
|
||||
when(putCallTask.getCall()).thenReturn(putCall);
|
||||
when(putCall.getHeader()).thenReturn(putHead);
|
||||
putCall.timestamp = System.currentTimeMillis();
|
||||
putCall.timestamp = timestamp;
|
||||
return putCallTask;
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue