Fixed signalling in ConcurrentArrayBlockingQueue.

This commit is contained in:
Simone Bordet 2013-03-14 00:09:44 +01:00
parent 4a8b7409af
commit f0a9f9caa0
2 changed files with 84 additions and 51 deletions

View File

@ -32,9 +32,9 @@ import java.util.concurrent.locks.ReentrantLock;
*
* @see Unbounded
* @see Bounded
* @param <T>
* @param <E>
*/
public abstract class ConcurrentArrayBlockingQueue<T> extends ConcurrentArrayQueue<T> implements BlockingQueue<T>
public abstract class ConcurrentArrayBlockingQueue<E> extends ConcurrentArrayQueue<E> implements BlockingQueue<E>
{
private final Lock _lock = new ReentrantLock();
private final Condition _consumer = _lock.newCondition();
@ -45,29 +45,25 @@ public abstract class ConcurrentArrayBlockingQueue<T> extends ConcurrentArrayQue
}
@Override
public T poll()
public E poll()
{
T result = super.poll();
E result = super.poll();
if (result != null && decrementAndGetSize() > 0)
signalProducer();
signalConsumer();
return result;
}
protected abstract int decrementAndGetSize();
protected abstract void signalProducer();
protected abstract void signalProducers();
@Override
public boolean remove(Object o)
{
boolean result = super.remove(o);
if (result && decrementAndGetSize() > 0)
signalProducer();
signalConsumer();
return result;
}
protected abstract int decrementAndGetSize();
protected void signalConsumer()
{
final Lock lock = _lock;
@ -83,11 +79,11 @@ public abstract class ConcurrentArrayBlockingQueue<T> extends ConcurrentArrayQue
}
@Override
public T take() throws InterruptedException
public E take() throws InterruptedException
{
while (true)
{
T result = poll();
E result = poll();
if (result != null)
return result;
@ -108,12 +104,12 @@ public abstract class ConcurrentArrayBlockingQueue<T> extends ConcurrentArrayQue
}
@Override
public T poll(long timeout, TimeUnit unit) throws InterruptedException
public E poll(long timeout, TimeUnit unit) throws InterruptedException
{
long nanos = unit.toNanos(timeout);
while (true)
{
T result = poll();
E result = poll();
if (result != null)
return result;
@ -123,9 +119,9 @@ public abstract class ConcurrentArrayBlockingQueue<T> extends ConcurrentArrayQue
{
if (size() == 0)
{
nanos = _consumer.awaitNanos(nanos);
if (nanos <= 0)
return null;
nanos = _consumer.awaitNanos(nanos);
}
}
finally
@ -136,13 +132,13 @@ public abstract class ConcurrentArrayBlockingQueue<T> extends ConcurrentArrayQue
}
@Override
public int drainTo(Collection<? super T> c)
public int drainTo(Collection<? super E> c)
{
return drainTo(c, Integer.MAX_VALUE);
}
@Override
public int drainTo(Collection<? super T> c, int maxElements)
public int drainTo(Collection<? super E> c, int maxElements)
{
if (c == this)
throw new IllegalArgumentException();
@ -150,26 +146,15 @@ public abstract class ConcurrentArrayBlockingQueue<T> extends ConcurrentArrayQue
int added = 0;
while (added < maxElements)
{
T element = poll();
E element = poll();
if (element == null)
break;
c.add(element);
++added;
}
if (added > 0)
signalProducers();
return added;
}
@Override
public void clear()
{
super.clear();
signalProducers();
}
/**
* An unbounded, blocking version of {@link ConcurrentArrayQueue}.
*
@ -243,18 +228,6 @@ public abstract class ConcurrentArrayBlockingQueue<T> extends ConcurrentArrayQue
{
throw new UnsupportedOperationException();
}
@Override
protected void signalProducer()
{
// Blocking put() and offer() not implemented, no need to signal them
}
@Override
protected void signalProducers()
{
// Blocking put() and offer() not implemented, no need to signal them
}
}
/**
@ -283,9 +256,9 @@ public abstract class ConcurrentArrayBlockingQueue<T> extends ConcurrentArrayQue
@Override
public boolean offer(E item)
{
int size = size();
while (true)
{
int size = size();
int nextSize = size + 1;
if (nextSize > _capacity)
@ -301,12 +274,30 @@ public abstract class ConcurrentArrayBlockingQueue<T> extends ConcurrentArrayQue
}
else
{
decrementAndGetSize();
size = decrementAndGetSize();
}
}
}
}
@Override
public E poll()
{
E result = super.poll();
if (result != null)
signalProducer();
return result;
}
@Override
public boolean remove(Object o)
{
boolean result = super.remove(o);
if (result)
signalProducer();
return result;
}
@Override
protected int decrementAndGetSize()
{
@ -362,9 +353,9 @@ public abstract class ConcurrentArrayBlockingQueue<T> extends ConcurrentArrayQue
{
if (size() == _capacity)
{
nanos = _producer.awaitNanos(nanos);
if (nanos <= 0)
return false;
nanos = _producer.awaitNanos(nanos);
}
}
finally
@ -379,7 +370,22 @@ public abstract class ConcurrentArrayBlockingQueue<T> extends ConcurrentArrayQue
}
@Override
protected void signalProducer()
public int drainTo(Collection<? super E> c, int maxElements)
{
int result = super.drainTo(c, maxElements);
if (result > 0)
signalProducers();
return result;
}
@Override
public void clear()
{
super.clear();
signalProducers();
}
private void signalProducer()
{
final Lock lock = _lock;
lock.lock();
@ -393,8 +399,7 @@ public abstract class ConcurrentArrayBlockingQueue<T> extends ConcurrentArrayQue
}
}
@Override
protected void signalProducers()
private void signalProducers()
{
final Lock lock = _lock;
lock.lock();

View File

@ -49,8 +49,36 @@ public class ConcurrentArrayBlockingQueueUnboundedTest extends ConcurrentArrayQu
ConcurrentArrayBlockingQueue<Integer> queue = newConcurrentArrayQueue(32);
Integer item = 1;
Assert.assertTrue(queue.offer(item));
Integer taken = queue.take();
Assert.assertSame(item, taken);
Integer result = queue.take();
Assert.assertSame(item, result);
}
@Test
public void testTimedPollOffer() throws Exception
{
final ConcurrentArrayBlockingQueue<Integer> queue = newConcurrentArrayQueue(32);
final long timeout = 1000;
final Integer item = 1;
new Thread()
{
@Override
public void run()
{
try
{
TimeUnit.MILLISECONDS.sleep(timeout);
queue.offer(item);
}
catch (InterruptedException x)
{
x.printStackTrace();
}
}
}.start();
Integer result = queue.poll(2 * timeout, TimeUnit.MILLISECONDS);
Assert.assertNotNull(result);
}
@Test