diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java index 77a9d65bd43..820f24cb728 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java @@ -27,8 +27,7 @@ import java.util.AbstractQueue; import java.util.HashMap; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.locks.ReentrantLock; -import java.util.concurrent.locks.Condition; +import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; @@ -55,16 +54,15 @@ public class FairCallQueue extends AbstractQueue /* The queues */ private final ArrayList> queues; - /* Read locks */ - private final ReentrantLock takeLock = new ReentrantLock(); - private final Condition notEmpty = takeLock.newCondition(); + /* Track available permits for scheduled objects. All methods that will + * mutate a subqueue must acquire or release a permit on the semaphore. + * A semaphore is much faster than an exclusive lock because producers do + * not contend with consumers and consumers do not block other consumers + * while polling. + */ + private final Semaphore semaphore = new Semaphore(0); private void signalNotEmpty() { - takeLock.lock(); - try { - notEmpty.signal(); - } finally { - takeLock.unlock(); - } + semaphore.release(); } /* Multiplexer picks which queue to draw from */ @@ -112,28 +110,25 @@ public class FairCallQueue extends AbstractQueue } /** - * Returns the first non-empty queue with equal to startIdx, or - * or scans from highest to lowest priority queue. + * Returns an element first non-empty queue equal to the priority returned + * by the multiplexer or scans from highest to lowest priority queue. + * + * Caller must always acquire a semaphore permit before invoking. * - * @param startIdx the queue number to start searching at * @return the first non-empty queue with less priority, or null if * everything was empty */ - private BlockingQueue getFirstNonEmptyQueue(int startIdx) { - BlockingQueue queue = this.queues.get(startIdx); - if (queue.size() != 0) { - return queue; - } - final int numQueues = this.queues.size(); - for(int i=0; i < numQueues; i++) { - queue = this.queues.get(i); - if (queue.size() != 0) { - return queue; + private E removeNextElement() { + int priority = multiplexer.getAndAdvanceCurrentIndex(); + E e = queues.get(priority).poll(); + if (e == null) { + for (int idx = 0; e == null && idx < queues.size(); idx++) { + e = queues.get(idx).poll(); } } - - // All queues were empty - return null; + // guaranteed to find an element if caller acquired permit. + assert e != null : "consumer didn't acquire semaphore!"; + return e; } /* AbstractQueue and BlockingQueue methods */ @@ -184,9 +179,9 @@ public class FairCallQueue extends AbstractQueue int priorityLevel = e.getPriorityLevel(); BlockingQueue q = this.queues.get(priorityLevel); boolean ret = q.offer(e, timeout, unit); - - signalNotEmpty(); - + if (ret) { + signalNotEmpty(); + } return ret; } @@ -195,72 +190,21 @@ public class FairCallQueue extends AbstractQueue int priorityLevel = e.getPriorityLevel(); BlockingQueue q = this.queues.get(priorityLevel); boolean ret = q.offer(e); - - signalNotEmpty(); - + if (ret) { + signalNotEmpty(); + } return ret; } @Override public E take() throws InterruptedException { - int startIdx = this.multiplexer.getAndAdvanceCurrentIndex(); - - takeLock.lockInterruptibly(); - try { - // Wait while queue is empty - for (;;) { - BlockingQueue q = this.getFirstNonEmptyQueue(startIdx); - if (q != null) { - // Got queue, so return if we can poll out an object - E e = q.poll(); - if (e != null) { - return e; - } - } - - notEmpty.await(); - } - } finally { - takeLock.unlock(); - } + semaphore.acquire(); + return removeNextElement(); } @Override - public E poll(long timeout, TimeUnit unit) - throws InterruptedException { - - int startIdx = this.multiplexer.getAndAdvanceCurrentIndex(); - - long nanos = unit.toNanos(timeout); - takeLock.lockInterruptibly(); - try { - for (;;) { - BlockingQueue q = this.getFirstNonEmptyQueue(startIdx); - if (q != null) { - E e = q.poll(); - if (e != null) { - // Escape condition: there might be something available - return e; - } - } - - if (nanos <= 0) { - // Wait has elapsed - return null; - } - - try { - // Now wait on the condition for a bit. If we get - // spuriously awoken we'll re-loop - nanos = notEmpty.awaitNanos(nanos); - } catch (InterruptedException ie) { - notEmpty.signal(); // propagate to a non-interrupted thread - throw ie; - } - } - } finally { - takeLock.unlock(); - } + public E poll(long timeout, TimeUnit unit) throws InterruptedException { + return semaphore.tryAcquire(timeout, unit) ? removeNextElement() : null; } /** @@ -269,15 +213,7 @@ public class FairCallQueue extends AbstractQueue */ @Override public E poll() { - int startIdx = this.multiplexer.getAndAdvanceCurrentIndex(); - - BlockingQueue q = this.getFirstNonEmptyQueue(startIdx); - if (q == null) { - return null; // everything is empty - } - - // Delegate to the sub-queue's poll, which could still return null - return q.poll(); + return semaphore.tryAcquire() ? removeNextElement() : null; } /** @@ -285,12 +221,11 @@ public class FairCallQueue extends AbstractQueue */ @Override public E peek() { - BlockingQueue q = this.getFirstNonEmptyQueue(0); - if (q == null) { - return null; - } else { - return q.peek(); + E e = null; + for (int i=0; e == null && i < queues.size(); i++) { + e = queues.get(i).peek(); } + return e; } /** @@ -301,11 +236,7 @@ public class FairCallQueue extends AbstractQueue */ @Override public int size() { - int size = 0; - for (BlockingQueue q : this.queues) { - size += q.size(); - } - return size; + return semaphore.availablePermits(); } /** @@ -324,20 +255,24 @@ public class FairCallQueue extends AbstractQueue */ @Override public int drainTo(Collection c, int maxElements) { - int sum = 0; - for (BlockingQueue q : this.queues) { - sum += q.drainTo(c, maxElements); + // initially take all permits to stop consumers from modifying queues + // while draining. will restore any excess when done draining. + final int permits = semaphore.drainPermits(); + final int numElements = Math.min(maxElements, permits); + int numRemaining = numElements; + for (int i=0; numRemaining > 0 && i < queues.size(); i++) { + numRemaining -= queues.get(i).drainTo(c, numRemaining); } - return sum; + int drained = numElements - numRemaining; + if (permits > drained) { // restore unused permits. + semaphore.release(permits - drained); + } + return drained; } @Override public int drainTo(Collection c) { - int sum = 0; - for (BlockingQueue q : this.queues) { - sum += q.drainTo(c); - } - return sum; + return drainTo(c, Integer.MAX_VALUE); } /**