HADOOP-14032. Reduce fair call queue priority inversion. Contributed by Daryn Sharp.

This commit is contained in:
Kihwal Lee 2017-02-09 10:04:28 -06:00
parent a8a594b4c8
commit a0bfb41504
2 changed files with 63 additions and 6 deletions

View File

@ -112,19 +112,21 @@ public FairCallQueue(int priorityLevels, int capacity, String ns,
} }
/** /**
* Returns the first non-empty queue with equal or lesser priority * Returns the first non-empty queue with equal to <i>startIdx</i>, or
* than <i>startIdx</i>. Wraps around, searching a maximum of N * or scans from highest to lowest priority queue.
* queues, where N is this.queues.size().
* *
* @param startIdx the queue number to start searching at * @param startIdx the queue number to start searching at
* @return the first non-empty queue with less priority, or null if * @return the first non-empty queue with less priority, or null if
* everything was empty * everything was empty
*/ */
private BlockingQueue<E> getFirstNonEmptyQueue(int startIdx) { private BlockingQueue<E> getFirstNonEmptyQueue(int startIdx) {
BlockingQueue<E> queue = this.queues.get(startIdx);
if (queue.size() != 0) {
return queue;
}
final int numQueues = this.queues.size(); final int numQueues = this.queues.size();
for(int i=0; i < numQueues; i++) { for(int i=0; i < numQueues; i++) {
int idx = (i + startIdx) % numQueues; // offset and wrap around queue = this.queues.get(i);
BlockingQueue<E> queue = this.queues.get(idx);
if (queue.size() != 0) { if (queue.size() != 0) {
return queue; return queue;
} }

View File

@ -28,9 +28,12 @@
import java.lang.management.ManagementFactory; import java.lang.management.ManagementFactory;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.junit.Test;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
public class TestFairCallQueue extends TestCase { public class TestFairCallQueue extends TestCase {
@ -43,6 +46,7 @@ private Schedulable mockCall(String id, int priority) {
when(ugi.getUserName()).thenReturn(id); when(ugi.getUserName()).thenReturn(id);
when(mockCall.getUserGroupInformation()).thenReturn(ugi); when(mockCall.getUserGroupInformation()).thenReturn(ugi);
when(mockCall.getPriorityLevel()).thenReturn(priority); when(mockCall.getPriorityLevel()).thenReturn(priority);
when(mockCall.toString()).thenReturn("id=" + id + " priority=" + priority);
return mockCall; return mockCall;
} }
@ -78,6 +82,57 @@ public void testTotalCapacityOfSubQueues() {
assertEquals(fairCallQueue.remainingCapacity(), 1025); assertEquals(fairCallQueue.remainingCapacity(), 1025);
} }
@Test
public void testPrioritization() {
int numQueues = 10;
Configuration conf = new Configuration();
fcq = new FairCallQueue<Schedulable>(numQueues, numQueues, "ns", conf);
//Schedulable[] calls = new Schedulable[numCalls];
List<Schedulable> calls = new ArrayList<>();
for (int i=0; i < numQueues; i++) {
Schedulable call = mockCall("u", i);
calls.add(call);
fcq.add(call);
}
final AtomicInteger currentIndex = new AtomicInteger();
fcq.setMultiplexer(new RpcMultiplexer(){
@Override
public int getAndAdvanceCurrentIndex() {
return currentIndex.get();
}
});
// if there is no call at a given index, return the next highest
// priority call available.
// v
//0123456789
currentIndex.set(3);
assertSame(calls.get(3), fcq.poll());
assertSame(calls.get(0), fcq.poll());
assertSame(calls.get(1), fcq.poll());
// v
//--2-456789
currentIndex.set(6);
assertSame(calls.get(6), fcq.poll());
assertSame(calls.get(2), fcq.poll());
assertSame(calls.get(4), fcq.poll());
// v
//-----5-789
currentIndex.set(8);
assertSame(calls.get(8), fcq.poll());
// v
//-----5-7-9
currentIndex.set(9);
assertSame(calls.get(9), fcq.poll());
assertSame(calls.get(5), fcq.poll());
assertSame(calls.get(7), fcq.poll());
//----------
assertNull(fcq.poll());
assertNull(fcq.poll());
}
// //
// Ensure that FairCallQueue properly implements BlockingQueue // Ensure that FairCallQueue properly implements BlockingQueue
// //