diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java index b525136ffca..09531d14ea3 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/util/BoundedPriorityBlockingQueue.java @@ -245,7 +245,7 @@ public class BoundedPriorityBlockingQueue extends AbstractQueue implements E result = null; try { while (queue.size() == 0 && nanos > 0) { - notEmpty.awaitNanos(nanos); + nanos = notEmpty.awaitNanos(nanos); } if (queue.size() > 0) { result = queue.poll(); diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedPriorityBlockingQueue.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedPriorityBlockingQueue.java index f09c79cfba4..93fd2dbef74 100644 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedPriorityBlockingQueue.java +++ b/hbase-server/src/test/java/org/apache/hadoop/hbase/util/TestBoundedPriorityBlockingQueue.java @@ -18,10 +18,16 @@ package org.apache.hadoop.hbase.util; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertFalse; import java.util.Comparator; +import java.util.PriorityQueue; +import java.util.concurrent.CyclicBarrier; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.hadoop.hbase.SmallTests; @@ -175,4 +181,56 @@ public class TestBoundedPriorityBlockingQueue { } assertEquals(null, queue.poll()); } + + @Test + public void testPoll() { + assertNull(queue.poll()); + PriorityQueue testList = new PriorityQueue(CAPACITY, new TestObjectComparator()); + + for (int i = 0; i < CAPACITY; ++i) { + TestObject obj = new TestObject(i, i); + testList.add(obj); + queue.offer(obj); + } + + for (int i = 0; i < CAPACITY; ++i) { + assertEquals(testList.poll(), queue.poll()); + } + + assertNull(null, queue.poll()); + } + + @Test(timeout=10000) + public void testPollInExecutor() throws InterruptedException { + final TestObject testObj = new TestObject(0, 0); + + final CyclicBarrier threadsStarted = new CyclicBarrier(2); + ExecutorService executor = Executors.newFixedThreadPool(2); + executor.execute(new Runnable() { + public void run() { + try { + assertNull(queue.poll(1000, TimeUnit.MILLISECONDS)); + threadsStarted.await(); + assertSame(testObj, queue.poll(1000, TimeUnit.MILLISECONDS)); + assertTrue(queue.isEmpty()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + executor.execute(new Runnable() { + public void run() { + try { + threadsStarted.await(); + queue.offer(testObj); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + }); + + executor.shutdown(); + assertTrue(executor.awaitTermination(8000, TimeUnit.MILLISECONDS)); + } }