HADOOP-14033. Reduce fair call queue lock contention. Contributed by Daryn Sharp.

This commit is contained in:
Kihwal Lee 2017-02-09 16:17:24 -06:00
parent 9b85053583
commit 0c01cf5798
1 changed files with 51 additions and 116 deletions

View File

@ -27,8 +27,7 @@ import java.util.AbstractQueue;
import java.util.HashMap;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
@ -55,16 +54,15 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
/* The queues */
private final ArrayList<BlockingQueue<E>> queues;
/* Read locks */
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
/* Track available permits for scheduled objects. All methods that will
* mutate a subqueue must acquire or release a permit on the semaphore.
* A semaphore is much faster than an exclusive lock because producers do
* not contend with consumers and consumers do not block other consumers
* while polling.
*/
private final Semaphore semaphore = new Semaphore(0);
private void signalNotEmpty() {
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
semaphore.release();
}
/* Multiplexer picks which queue to draw from */
@ -112,28 +110,25 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
}
/**
* Returns the first non-empty queue with equal to <i>startIdx</i>, or
* or scans from highest to lowest priority queue.
* Returns an element first non-empty queue equal to the priority returned
* by the multiplexer or scans from highest to lowest priority queue.
*
* Caller must always acquire a semaphore permit before invoking.
*
* @param startIdx the queue number to start searching at
* @return the first non-empty queue with less priority, or null if
* everything was empty
*/
private BlockingQueue<E> getFirstNonEmptyQueue(int startIdx) {
BlockingQueue<E> queue = this.queues.get(startIdx);
if (queue.size() != 0) {
return queue;
}
final int numQueues = this.queues.size();
for(int i=0; i < numQueues; i++) {
queue = this.queues.get(i);
if (queue.size() != 0) {
return queue;
private E removeNextElement() {
int priority = multiplexer.getAndAdvanceCurrentIndex();
E e = queues.get(priority).poll();
if (e == null) {
for (int idx = 0; e == null && idx < queues.size(); idx++) {
e = queues.get(idx).poll();
}
}
// All queues were empty
return null;
// guaranteed to find an element if caller acquired permit.
assert e != null : "consumer didn't acquire semaphore!";
return e;
}
/* AbstractQueue and BlockingQueue methods */
@ -184,9 +179,9 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
int priorityLevel = e.getPriorityLevel();
BlockingQueue<E> q = this.queues.get(priorityLevel);
boolean ret = q.offer(e, timeout, unit);
signalNotEmpty();
if (ret) {
signalNotEmpty();
}
return ret;
}
@ -195,72 +190,21 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
int priorityLevel = e.getPriorityLevel();
BlockingQueue<E> q = this.queues.get(priorityLevel);
boolean ret = q.offer(e);
signalNotEmpty();
if (ret) {
signalNotEmpty();
}
return ret;
}
@Override
public E take() throws InterruptedException {
int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();
takeLock.lockInterruptibly();
try {
// Wait while queue is empty
for (;;) {
BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
if (q != null) {
// Got queue, so return if we can poll out an object
E e = q.poll();
if (e != null) {
return e;
}
}
notEmpty.await();
}
} finally {
takeLock.unlock();
}
semaphore.acquire();
return removeNextElement();
}
@Override
public E poll(long timeout, TimeUnit unit)
throws InterruptedException {
int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();
long nanos = unit.toNanos(timeout);
takeLock.lockInterruptibly();
try {
for (;;) {
BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
if (q != null) {
E e = q.poll();
if (e != null) {
// Escape condition: there might be something available
return e;
}
}
if (nanos <= 0) {
// Wait has elapsed
return null;
}
try {
// Now wait on the condition for a bit. If we get
// spuriously awoken we'll re-loop
nanos = notEmpty.awaitNanos(nanos);
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to a non-interrupted thread
throw ie;
}
}
} finally {
takeLock.unlock();
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
return semaphore.tryAcquire(timeout, unit) ? removeNextElement() : null;
}
/**
@ -269,15 +213,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
*/
@Override
public E poll() {
int startIdx = this.multiplexer.getAndAdvanceCurrentIndex();
BlockingQueue<E> q = this.getFirstNonEmptyQueue(startIdx);
if (q == null) {
return null; // everything is empty
}
// Delegate to the sub-queue's poll, which could still return null
return q.poll();
return semaphore.tryAcquire() ? removeNextElement() : null;
}
/**
@ -285,12 +221,11 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
*/
@Override
public E peek() {
BlockingQueue<E> q = this.getFirstNonEmptyQueue(0);
if (q == null) {
return null;
} else {
return q.peek();
E e = null;
for (int i=0; e == null && i < queues.size(); i++) {
e = queues.get(i).peek();
}
return e;
}
/**
@ -301,11 +236,7 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
*/
@Override
public int size() {
int size = 0;
for (BlockingQueue<E> q : this.queues) {
size += q.size();
}
return size;
return semaphore.availablePermits();
}
/**
@ -324,20 +255,24 @@ public class FairCallQueue<E extends Schedulable> extends AbstractQueue<E>
*/
@Override
public int drainTo(Collection<? super E> c, int maxElements) {
int sum = 0;
for (BlockingQueue<E> q : this.queues) {
sum += q.drainTo(c, maxElements);
// initially take all permits to stop consumers from modifying queues
// while draining. will restore any excess when done draining.
final int permits = semaphore.drainPermits();
final int numElements = Math.min(maxElements, permits);
int numRemaining = numElements;
for (int i=0; numRemaining > 0 && i < queues.size(); i++) {
numRemaining -= queues.get(i).drainTo(c, numRemaining);
}
return sum;
int drained = numElements - numRemaining;
if (permits > drained) { // restore unused permits.
semaphore.release(permits - drained);
}
return drained;
}
@Override
public int drainTo(Collection<? super E> c) {
int sum = 0;
for (BlockingQueue<E> q : this.queues) {
sum += q.drainTo(c);
}
return sum;
return drainTo(c, Integer.MAX_VALUE);
}
/**