HADOOP-17027. Add tests for reading fair call queue capacity weight configs. Contributed by Fengnan Li

This commit is contained in:
Mingliang Liu 2020-05-07 16:50:23 -07:00
parent d59de27c01
commit e9e1ead089
No known key found for this signature in database
GPG Key ID: BC2FB8C6908A0C16
2 changed files with 65 additions and 15 deletions

View File

@ -176,6 +176,12 @@ public class TestCallQueueManager {
private static final Class<? extends RpcScheduler> schedulerClass private static final Class<? extends RpcScheduler> schedulerClass
= CallQueueManager.convertSchedulerClass(DefaultRpcScheduler.class); = CallQueueManager.convertSchedulerClass(DefaultRpcScheduler.class);
private static final Class<? extends BlockingQueue<FakeCall>> fcqueueClass
= CallQueueManager.convertQueueClass(FairCallQueue.class, FakeCall.class);
private static final Class<? extends RpcScheduler> rpcSchedulerClass
= CallQueueManager.convertSchedulerClass(DecayRpcScheduler.class);
@Test @Test
public void testCallQueueCapacity() throws InterruptedException { public void testCallQueueCapacity() throws InterruptedException {
manager = new CallQueueManager<FakeCall>(queueClass, schedulerClass, false, manager = new CallQueueManager<FakeCall>(queueClass, schedulerClass, false,
@ -319,6 +325,55 @@ public class TestCallQueueManager {
assertEquals(totalCallsConsumed, totalCallsCreated); assertEquals(totalCallsConsumed, totalCallsCreated);
} }
@Test
public void testQueueCapacity() throws InterruptedException {
int capacity = 4;
String ns = "ipc.8020";
conf.setInt("ipc.8020.scheduler.priority.levels", 2);
conf.set("ipc.8020.callqueue.capacity.weights", "1,3");
manager = new CallQueueManager<>(fcqueueClass, rpcSchedulerClass, false,
capacity, ns, conf);
// insert 4 calls with 2 at each priority
// since the queue with priority 0 has only 1 capacity, the second call
// with p0 will be overflowed to queue with priority 1
for (int i = 0; i < capacity; i++) {
FakeCall fc = new FakeCall(i);
fc.setPriorityLevel(i%2);
manager.put(fc);
}
// get calls, the order should be
// call 0 with p0
// call 1 with p1
// call 2 with p0 since overflow
// call 3 with p1
assertEquals(manager.take().priorityLevel, 0);
assertEquals(manager.take().priorityLevel, 1);
assertEquals(manager.take().priorityLevel, 0);
assertEquals(manager.take().priorityLevel, 1);
conf.set("ipc.8020.callqueue.capacity.weights", "1,1");
manager = new CallQueueManager<>(fcqueueClass, rpcSchedulerClass, false,
capacity, ns, conf);
for (int i = 0; i < capacity; i++) {
FakeCall fc = new FakeCall(i);
fc.setPriorityLevel(i%2);
manager.put(fc);
}
// get calls, the order should be
// call 0 with p0
// call 2 with p0
// call 1 with p1
// call 3 with p1
assertEquals(manager.take().priorityLevel, 0);
assertEquals(manager.take().priorityLevel, 0);
assertEquals(manager.take().priorityLevel, 1);
assertEquals(manager.take().priorityLevel, 1);
}
public static class ExceptionFakeCall implements Schedulable { public static class ExceptionFakeCall implements Schedulable {
public ExceptionFakeCall() { public ExceptionFakeCall() {
throw new IllegalArgumentException("Exception caused by call queue " + throw new IllegalArgumentException("Exception caused by call queue " +

View File

@ -170,18 +170,13 @@ public class TestFairCallQueue {
// default weights i.e. all queues share capacity // default weights i.e. all queues share capacity
fcq = new FairCallQueue<Schedulable>(numQueues, 4, "ns", conf); fcq = new FairCallQueue<Schedulable>(numQueues, 4, "ns", conf);
FairCallQueue<Schedulable> fcq1 = new FairCallQueue<Schedulable>( FairCallQueue<Schedulable> fcq1 = new FairCallQueue<Schedulable>(
numQueues, capacity, "ns", new int[]{3, 1}, conf); numQueues, capacity, "ns", new int[]{1, 3}, conf);
for (int i=0; i < capacity; i++) { for (int i=0; i < capacity; i++) {
Schedulable call = mockCall("u", i%2); Schedulable call = mockCall("u", i%2);
calls.add(call); calls.add(call);
fcq.add(call); fcq.add(call);
fcq1.add(call); fcq1.add(call);
call = mockCall("u", (i++)%2);
calls.add(call);
fcq.add(call);
fcq1.add(call);
} }
final AtomicInteger currentIndex = new AtomicInteger(); final AtomicInteger currentIndex = new AtomicInteger();
@ -200,24 +195,24 @@ public class TestFairCallQueue {
// either queue will have two calls // either queue will have two calls
// v // v
// 0 2 // 0 1
// 1 3 // 2 3
currentIndex.set(1); currentIndex.set(1);
assertSame(calls.get(2), fcq.poll()); assertSame(calls.get(1), fcq.poll());
assertSame(calls.get(3), fcq.poll()); assertSame(calls.get(3), fcq.poll());
assertSame(calls.get(0), fcq.poll()); assertSame(calls.get(0), fcq.poll());
assertSame(calls.get(1), fcq.poll()); assertSame(calls.get(2), fcq.poll());
// queues with different number of calls // queues with different number of calls
// v // v
// 0 3 // 0 1
// 1 // 2
// 2 // 3
currentIndex.set(1); currentIndex.set(1);
assertSame(calls.get(3), fcq1.poll());
assertSame(calls.get(0), fcq1.poll());
assertSame(calls.get(1), fcq1.poll()); assertSame(calls.get(1), fcq1.poll());
assertSame(calls.get(2), fcq1.poll()); assertSame(calls.get(2), fcq1.poll());
assertSame(calls.get(3), fcq1.poll());
assertSame(calls.get(0), fcq1.poll());
} }
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")