diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java index c2d3cd80ca7..77a9d65bd43 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java @@ -112,19 +112,21 @@ public FairCallQueue(int priorityLevels, int capacity, String ns, } /** - * Returns the first non-empty queue with equal or lesser priority - * than startIdx. Wraps around, searching a maximum of N - * queues, where N is this.queues.size(). + * Returns the first non-empty queue with equal to startIdx, or + * or scans from highest to lowest priority queue. * * @param startIdx the queue number to start searching at * @return the first non-empty queue with less priority, or null if * everything was empty */ private BlockingQueue getFirstNonEmptyQueue(int startIdx) { + BlockingQueue queue = this.queues.get(startIdx); + if (queue.size() != 0) { + return queue; + } final int numQueues = this.queues.size(); for(int i=0; i < numQueues; i++) { - int idx = (i + startIdx) % numQueues; // offset and wrap around - BlockingQueue queue = this.queues.get(idx); + queue = this.queues.get(i); if (queue.size() != 0) { return queue; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java index 96dea803764..901a7718788 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java @@ -28,9 +28,12 @@ import java.lang.management.ManagementFactory; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.BlockingQueue; - import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Test; import org.apache.hadoop.conf.Configuration; public class TestFairCallQueue extends TestCase { @@ -43,6 +46,7 @@ private Schedulable mockCall(String id, int priority) { when(ugi.getUserName()).thenReturn(id); when(mockCall.getUserGroupInformation()).thenReturn(ugi); when(mockCall.getPriorityLevel()).thenReturn(priority); + when(mockCall.toString()).thenReturn("id=" + id + " priority=" + priority); return mockCall; } @@ -78,6 +82,57 @@ public void testTotalCapacityOfSubQueues() { assertEquals(fairCallQueue.remainingCapacity(), 1025); } + @Test + public void testPrioritization() { + int numQueues = 10; + Configuration conf = new Configuration(); + fcq = new FairCallQueue(numQueues, numQueues, "ns", conf); + + //Schedulable[] calls = new Schedulable[numCalls]; + List calls = new ArrayList<>(); + for (int i=0; i < numQueues; i++) { + Schedulable call = mockCall("u", i); + calls.add(call); + fcq.add(call); + } + + final AtomicInteger currentIndex = new AtomicInteger(); + fcq.setMultiplexer(new RpcMultiplexer(){ + @Override + public int getAndAdvanceCurrentIndex() { + return currentIndex.get(); + } + }); + + // if there is no call at a given index, return the next highest + // priority call available. + // v + //0123456789 + currentIndex.set(3); + assertSame(calls.get(3), fcq.poll()); + assertSame(calls.get(0), fcq.poll()); + assertSame(calls.get(1), fcq.poll()); + // v + //--2-456789 + currentIndex.set(6); + assertSame(calls.get(6), fcq.poll()); + assertSame(calls.get(2), fcq.poll()); + assertSame(calls.get(4), fcq.poll()); + // v + //-----5-789 + currentIndex.set(8); + assertSame(calls.get(8), fcq.poll()); + // v + //-----5-7-9 + currentIndex.set(9); + assertSame(calls.get(9), fcq.poll()); + assertSame(calls.get(5), fcq.poll()); + assertSame(calls.get(7), fcq.poll()); + //---------- + assertNull(fcq.poll()); + assertNull(fcq.poll()); + } + // // Ensure that FairCallQueue properly implements BlockingQueue //