HBASE-11139 BoundedPriorityBlockingQueue#poll() should check the return value from awaitNanos() (Shengzhe Yao)

git-svn-id: https://svn.apache.org/repos/asf/hbase/trunk@1593854 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
mbertozzi 2014-05-11 20:36:50 +00:00
parent 88f8f755c8
commit bbde026ebc
2 changed files with 59 additions and 1 deletions

View File

@ -245,7 +245,7 @@ public class BoundedPriorityBlockingQueue<E> extends AbstractQueue<E> implements
E result = null; E result = null;
try { try {
while (queue.size() == 0 && nanos > 0) { while (queue.size() == 0 && nanos > 0) {
notEmpty.awaitNanos(nanos); nanos = notEmpty.awaitNanos(nanos);
} }
if (queue.size() > 0) { if (queue.size() > 0) {
result = queue.poll(); result = queue.poll();

View File

@ -18,10 +18,16 @@
package org.apache.hadoop.hbase.util; package org.apache.hadoop.hbase.util;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertFalse;
import java.util.Comparator; import java.util.Comparator;
import java.util.PriorityQueue;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.SmallTests; import org.apache.hadoop.hbase.SmallTests;
@ -175,4 +181,56 @@ public class TestBoundedPriorityBlockingQueue {
} }
assertEquals(null, queue.poll()); assertEquals(null, queue.poll());
} }
@Test
public void testPoll() {
assertNull(queue.poll());
PriorityQueue<TestObject> testList = new PriorityQueue<TestObject>(CAPACITY, new TestObjectComparator());
for (int i = 0; i < CAPACITY; ++i) {
TestObject obj = new TestObject(i, i);
testList.add(obj);
queue.offer(obj);
}
for (int i = 0; i < CAPACITY; ++i) {
assertEquals(testList.poll(), queue.poll());
}
assertNull(null, queue.poll());
}
@Test(timeout=10000)
public void testPollInExecutor() throws InterruptedException {
final TestObject testObj = new TestObject(0, 0);
final CyclicBarrier threadsStarted = new CyclicBarrier(2);
ExecutorService executor = Executors.newFixedThreadPool(2);
executor.execute(new Runnable() {
public void run() {
try {
assertNull(queue.poll(1000, TimeUnit.MILLISECONDS));
threadsStarted.await();
assertSame(testObj, queue.poll(1000, TimeUnit.MILLISECONDS));
assertTrue(queue.isEmpty());
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
executor.execute(new Runnable() {
public void run() {
try {
threadsStarted.await();
queue.offer(testObj);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
});
executor.shutdown();
assertTrue(executor.awaitTermination(8000, TimeUnit.MILLISECONDS));
}
} }