diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java index 20161b8e28f..6d9ea3e72e4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java @@ -122,13 +122,15 @@ public class FairCallQueue extends AbstractQueue private E removeNextElement() { int priority = multiplexer.getAndAdvanceCurrentIndex(); E e = queues.get(priority).poll(); - if (e == null) { + // a semaphore permit has been acquired, so an element MUST be extracted + // or the semaphore and queued elements will go out of sync. loop to + // avoid race condition if elements are added behind the current position, + // awakening other threads that poll the elements ahead of our position. + while (e == null) { for (int idx = 0; e == null && idx < queues.size(); idx++) { e = queues.get(idx).poll(); } } - // guaranteed to find an element if caller acquired permit. - assert e != null : "consumer didn't acquire semaphore!"; return e; }