HADOOP-13189. FairCallQueue makes callQueue larger than the configured capacity. Contributed by Vinitha Gankidi.

This commit is contained in:
Konstantin V Shvachko 2016-06-16 18:20:49 -07:00
parent 04682cc6b0
commit 408848d1e9
4 changed files with 37 additions and 17 deletions

View File

@ -72,8 +72,8 @@ public class CallQueueManager<E> {
this.clientBackOffEnabled = clientBackOffEnabled; this.clientBackOffEnabled = clientBackOffEnabled;
this.putRef = new AtomicReference<BlockingQueue<E>>(bq); this.putRef = new AtomicReference<BlockingQueue<E>>(bq);
this.takeRef = new AtomicReference<BlockingQueue<E>>(bq); this.takeRef = new AtomicReference<BlockingQueue<E>>(bq);
LOG.info("Using callQueue: " + backingClass + " scheduler: " + LOG.info("Using callQueue: " + backingClass + " queueCapacity: " +
schedulerClass); maxQueueSize + " scheduler: " + schedulerClass);
} }
private static <T extends RpcScheduler> T createScheduler( private static <T extends RpcScheduler> T createScheduler(

View File

@ -75,11 +75,12 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
/** /**
* Create a FairCallQueue. * Create a FairCallQueue.
* @param capacity the maximum size of each sub-queue * @param capacity the total size of all sub-queues
* @param ns the prefix to use for configuration * @param ns the prefix to use for configuration
* @param conf the configuration to read from * @param conf the configuration to read from
* Notes: the FairCallQueue has no fixed capacity. Rather, it has a minimum * Notes: Each sub-queue has a capacity of `capacity / numSubqueues`.
* capacity of `capacity` and a maximum capacity of `capacity * number_queues` * The first or the highest priority sub-queue has an excess capacity
* of `capacity % numSubqueues`
*/ */
public FairCallQueue(int priorityLevels, int capacity, String ns, public FairCallQueue(int priorityLevels, int capacity, String ns,
Configuration conf) { Configuration conf) {
@ -88,13 +89,19 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
"at least 1"); "at least 1");
} }
int numQueues = priorityLevels; int numQueues = priorityLevels;
LOG.info("FairCallQueue is in use with " + numQueues + " queues."); LOG.info("FairCallQueue is in use with " + numQueues +
" queues with total capacity of " + capacity);
this.queues = new ArrayList<BlockingQueue<E>>(numQueues); this.queues = new ArrayList<BlockingQueue<E>>(numQueues);
this.overflowedCalls = new ArrayList<AtomicLong>(numQueues); this.overflowedCalls = new ArrayList<AtomicLong>(numQueues);
int queueCapacity = capacity / numQueues;
int capacityForFirstQueue = queueCapacity + (capacity % numQueues);
for(int i=0; i < numQueues; i++) { for(int i=0; i < numQueues; i++) {
this.queues.add(new LinkedBlockingQueue<E>(capacity)); if (i == 0) {
this.queues.add(new LinkedBlockingQueue<E>(capacityForFirstQueue));
} else {
this.queues.add(new LinkedBlockingQueue<E>(queueCapacity));
}
this.overflowedCalls.add(new AtomicLong(0)); this.overflowedCalls.add(new AtomicLong(0));
} }

View File

@ -214,9 +214,9 @@ public class TestCallQueueManager {
assertTrue(queue.getCanonicalName().equals(queueClassName)); assertTrue(queue.getCanonicalName().equals(queueClassName));
manager = new CallQueueManager<FakeCall>(queue, scheduler, false, manager = new CallQueueManager<FakeCall>(queue, scheduler, false,
2, "", conf); 8, "", conf);
// Default FCQ has 4 levels and the max capacity is 2 x 4 // Default FCQ has 4 levels and the max capacity is 8
assertCanPut(manager, 3, 3); assertCanPut(manager, 3, 3);
} }

View File

@ -18,12 +18,6 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -68,7 +62,26 @@ public class TestFairCallQueue extends TestCase {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
conf.setInt("ns." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2); conf.setInt("ns." + FairCallQueue.IPC_CALLQUEUE_PRIORITY_LEVELS_KEY, 2);
fcq = new FairCallQueue<Schedulable>(2, 5, "ns", conf); fcq = new FairCallQueue<Schedulable>(2, 10, "ns", conf);
}
// Validate that the total capacity of all subqueues equals
// the maxQueueSize for different values of maxQueueSize
public void testTotalCapacityOfSubQueues() {
Configuration conf = new Configuration();
FairCallQueue<Schedulable> fairCallQueue;
fairCallQueue = new FairCallQueue<Schedulable>(1, 1000, "ns", conf);
assertEquals(fairCallQueue.remainingCapacity(), 1000);
fairCallQueue = new FairCallQueue<Schedulable>(4, 1000, "ns", conf);
assertEquals(fairCallQueue.remainingCapacity(), 1000);
fairCallQueue = new FairCallQueue<Schedulable>(7, 1000, "ns", conf);
assertEquals(fairCallQueue.remainingCapacity(), 1000);
fairCallQueue = new FairCallQueue<Schedulable>(1, 1025, "ns", conf);
assertEquals(fairCallQueue.remainingCapacity(), 1025);
fairCallQueue = new FairCallQueue<Schedulable>(4, 1025, "ns", conf);
assertEquals(fairCallQueue.remainingCapacity(), 1025);
fairCallQueue = new FairCallQueue<Schedulable>(7, 1025, "ns", conf);
assertEquals(fairCallQueue.remainingCapacity(), 1025);
} }
// //