412940 minor threadsafe fixes

hold lock for accessing elements size
This commit is contained in:
Greg Wilkins 2013-07-15 17:00:51 +10:00
parent 8d49f69aa4
commit e758e0111b
1 changed files with 125 additions and 121 deletions

View File

@ -34,29 +34,25 @@ import java.util.concurrent.locks.ReentrantLock;
/** /**
* A BlockingQueue backed by a circular array capable or growing. * A BlockingQueue backed by a circular array capable or growing.
* <p/> * <p/>
* This queue is uses a variant of the two lock queue algorithm to provide an * This queue is uses a variant of the two lock queue algorithm to provide an efficient queue or list backed by a growable circular array.
* efficient queue or list backed by a growable circular array.
* <p/> * <p/>
* Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is * Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is able to grow and provides a blocking put call.
* able to grow and provides a blocking put call.
* <p/> * <p/>
* The queue has both a capacity (the size of the array currently allocated) * The queue has both a capacity (the size of the array currently allocated) and a max capacity (the maximum size that may be allocated), which defaults to
* and a max capacity (the maximum size that may be allocated), which defaults to
* {@link Integer#MAX_VALUE}. * {@link Integer#MAX_VALUE}.
* *
* @param <E> The element type * @param <E>
* The element type
*/ */
public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E> public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
{ {
/** /**
* The head offset in the {@link #_indexes} array, displaced * The head offset in the {@link #_indexes} array, displaced by 15 slots to avoid false sharing with the array length (stored before the first element of
* by 15 slots to avoid false sharing with the array length * the array itself).
* (stored before the first element of the array itself).
*/ */
private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1; private static final int HEAD_OFFSET = MemoryUtils.getIntegersPerCacheLine() - 1;
/** /**
* The tail offset in the {@link #_indexes} array, displaced * The tail offset in the {@link #_indexes} array, displaced by 16 slots from the head to avoid false sharing with it.
* by 16 slots from the head to avoid false sharing with it.
*/ */
private static final int TAIL_OFFSET = HEAD_OFFSET + MemoryUtils.getIntegersPerCacheLine(); private static final int TAIL_OFFSET = HEAD_OFFSET + MemoryUtils.getIntegersPerCacheLine();
/** /**
@ -94,10 +90,10 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
} }
/** /**
* Creates a bounded {@link BlockingArrayQueue} that does not grow. * Creates a bounded {@link BlockingArrayQueue} that does not grow. The capacity of the queue is fixed and equal to the given parameter.
* The capacity of the queue is fixed and equal to the given parameter.
* *
* @param maxCapacity the maximum capacity * @param maxCapacity
* the maximum capacity
*/ */
public BlockingArrayQueue(int maxCapacity) public BlockingArrayQueue(int maxCapacity)
{ {
@ -109,8 +105,10 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
/** /**
* Creates an unbounded {@link BlockingArrayQueue} that grows by the given parameter. * Creates an unbounded {@link BlockingArrayQueue} that grows by the given parameter.
* *
* @param capacity the initial capacity * @param capacity
* @param growBy the growth factor * the initial capacity
* @param growBy
* the growth factor
*/ */
public BlockingArrayQueue(int capacity, int growBy) public BlockingArrayQueue(int capacity, int growBy)
{ {
@ -122,9 +120,12 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
/** /**
* Create a bounded {@link BlockingArrayQueue} that grows by the given parameter. * Create a bounded {@link BlockingArrayQueue} that grows by the given parameter.
* *
* @param capacity the initial capacity * @param capacity
* @param growBy the growth factor * the initial capacity
* @param maxCapacity the maximum capacity * @param growBy
* the growth factor
* @param maxCapacity
* the maximum capacity
*/ */
public BlockingArrayQueue(int capacity, int growBy, int maxCapacity) public BlockingArrayQueue(int capacity, int growBy, int maxCapacity)
{ {
@ -142,12 +143,12 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
@Override @Override
public void clear() public void clear()
{ {
final Lock tailLock = _tailLock;
tailLock.lock(); _tailLock.lock();
try try
{ {
final Lock headLock = _headLock;
headLock.lock(); _headLock.lock();
try try
{ {
_indexes[HEAD_OFFSET] = 0; _indexes[HEAD_OFFSET] = 0;
@ -156,12 +157,12 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
} }
finally finally
{ {
headLock.unlock(); _headLock.unlock();
} }
} }
finally finally
{ {
tailLock.unlock(); _tailLock.unlock();
} }
} }
@ -189,8 +190,8 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
return null; return null;
E e = null; E e = null;
final Lock headLock = _headLock;
headLock.lock(); // Size cannot shrink _headLock.lock(); // Size cannot shrink
try try
{ {
if (_size.get() > 0) if (_size.get() > 0)
@ -205,7 +206,7 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
} }
finally finally
{ {
headLock.unlock(); _headLock.unlock();
} }
return e; return e;
} }
@ -218,8 +219,8 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
return null; return null;
E e = null; E e = null;
final Lock headLock = _headLock;
headLock.lock(); // Size cannot shrink _headLock.lock(); // Size cannot shrink
try try
{ {
if (_size.get() > 0) if (_size.get() > 0)
@ -227,7 +228,7 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
} }
finally finally
{ {
headLock.unlock(); _headLock.unlock();
} }
return e; return e;
} }
@ -259,10 +260,8 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
{ {
Objects.requireNonNull(e); Objects.requireNonNull(e);
final Lock tailLock = _tailLock;
final Lock headLock = _headLock;
boolean notEmpty = false; boolean notEmpty = false;
tailLock.lock(); // Size cannot grow... only shrink _tailLock.lock(); // Size cannot grow... only shrink
try try
{ {
int size = _size.get(); int size = _size.get();
@ -272,7 +271,7 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
// Should we expand array? // Should we expand array?
if (size == _elements.length) if (size == _elements.length)
{ {
headLock.lock(); _headLock.lock();
try try
{ {
if (!grow()) if (!grow())
@ -280,7 +279,7 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
} }
finally finally
{ {
headLock.unlock(); _headLock.unlock();
} }
} }
@ -292,19 +291,19 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
} }
finally finally
{ {
tailLock.unlock(); _tailLock.unlock();
} }
if (notEmpty) if (notEmpty)
{ {
headLock.lock(); _headLock.lock();
try try
{ {
_notEmpty.signal(); _notEmpty.signal();
} }
finally finally
{ {
headLock.unlock(); _headLock.unlock();
} }
} }
@ -339,8 +338,8 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
public E take() throws InterruptedException public E take() throws InterruptedException
{ {
E e = null; E e = null;
final Lock headLock = _headLock;
headLock.lockInterruptibly(); // Size cannot shrink _headLock.lockInterruptibly(); // Size cannot shrink
try try
{ {
try try
@ -366,7 +365,7 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
} }
finally finally
{ {
headLock.unlock(); _headLock.unlock();
} }
return e; return e;
} }
@ -377,8 +376,8 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
{ {
long nanos = unit.toNanos(time); long nanos = unit.toNanos(time);
E e = null; E e = null;
final Lock headLock = _headLock;
headLock.lockInterruptibly(); // Size cannot shrink _headLock.lockInterruptibly(); // Size cannot shrink
try try
{ {
try try
@ -406,7 +405,7 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
} }
finally finally
{ {
headLock.unlock(); _headLock.unlock();
} }
return e; return e;
} }
@ -414,12 +413,12 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
@Override @Override
public boolean remove(Object o) public boolean remove(Object o)
{ {
final Lock tailLock = _tailLock;
tailLock.lock(); _tailLock.lock();
try try
{ {
final Lock headLock = _headLock;
headLock.lock(); _headLock.lock();
try try
{ {
if (isEmpty()) if (isEmpty())
@ -446,36 +445,36 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
} }
finally finally
{ {
headLock.unlock(); _headLock.unlock();
} }
} }
finally finally
{ {
tailLock.unlock(); _tailLock.unlock();
} }
} }
@Override @Override
public int remainingCapacity() public int remainingCapacity()
{ {
final Lock tailLock = _tailLock;
tailLock.lock(); _tailLock.lock();
try try
{ {
final Lock headLock = _headLock;
headLock.lock(); _headLock.lock();
try try
{ {
return getCapacity() - size(); return getCapacity() - size();
} }
finally finally
{ {
headLock.unlock(); _headLock.unlock();
} }
} }
finally finally
{ {
tailLock.unlock(); _tailLock.unlock();
} }
} }
@ -499,12 +498,12 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
@Override @Override
public E get(int index) public E get(int index)
{ {
final Lock tailLock = _tailLock;
tailLock.lock(); _tailLock.lock();
try try
{ {
final Lock headLock = _headLock;
headLock.lock(); _headLock.lock();
try try
{ {
if (index < 0 || index >= _size.get()) if (index < 0 || index >= _size.get())
@ -517,12 +516,12 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
} }
finally finally
{ {
headLock.unlock(); _headLock.unlock();
} }
} }
finally finally
{ {
tailLock.unlock(); _tailLock.unlock();
} }
} }
@ -532,12 +531,11 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
if (e == null) if (e == null)
throw new NullPointerException(); throw new NullPointerException();
final Lock tailLock = _tailLock; _tailLock.lock();
tailLock.lock();
try try
{ {
final Lock headLock = _headLock;
headLock.lock(); _headLock.lock();
try try
{ {
final int size = _size.get(); final int size = _size.get();
@ -586,12 +584,12 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
} }
finally finally
{ {
headLock.unlock(); _headLock.unlock();
} }
} }
finally finally
{ {
tailLock.unlock(); _tailLock.unlock();
} }
} }
@ -601,12 +599,11 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
{ {
Objects.requireNonNull(e); Objects.requireNonNull(e);
final Lock tailLock = _tailLock; _tailLock.lock();
tailLock.lock();
try try
{ {
final Lock headLock = _headLock;
headLock.lock(); _headLock.lock();
try try
{ {
if (index < 0 || index >= _size.get()) if (index < 0 || index >= _size.get())
@ -622,12 +619,12 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
} }
finally finally
{ {
headLock.unlock(); _headLock.unlock();
} }
} }
finally finally
{ {
tailLock.unlock(); _tailLock.unlock();
} }
} }
@ -635,12 +632,12 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
@Override @Override
public E remove(int index) public E remove(int index)
{ {
final Lock tailLock = _tailLock;
tailLock.lock(); _tailLock.lock();
try try
{ {
final Lock headLock = _headLock;
headLock.lock(); _headLock.lock();
try try
{ {
if (index < 0 || index >= _size.get()) if (index < 0 || index >= _size.get())
@ -680,24 +677,24 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
} }
finally finally
{ {
headLock.unlock(); _headLock.unlock();
} }
} }
finally finally
{ {
tailLock.unlock(); _tailLock.unlock();
} }
} }
@Override @Override
public ListIterator<E> listIterator(int index) public ListIterator<E> listIterator(int index)
{ {
final Lock tailLock = _tailLock;
tailLock.lock(); _tailLock.lock();
try try
{ {
final Lock headLock = _headLock;
headLock.lock(); _headLock.lock();
try try
{ {
Object[] elements = new Object[size()]; Object[] elements = new Object[size()];
@ -720,12 +717,12 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
} }
finally finally
{ {
headLock.unlock(); _headLock.unlock();
} }
} }
finally finally
{ {
tailLock.unlock(); _tailLock.unlock();
} }
} }
@ -737,9 +734,17 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
* @return the current capacity of this queue * @return the current capacity of this queue
*/ */
public int getCapacity() public int getCapacity()
{
_tailLock.lock();
try
{ {
return _elements.length; return _elements.length;
} }
finally
{
_tailLock.unlock();
}
}
/** /**
* @return the max capacity of this queue, or -1 if this queue is unbounded * @return the max capacity of this queue, or -1 if this queue is unbounded
@ -758,12 +763,11 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
if (_growCapacity <= 0) if (_growCapacity <= 0)
return false; return false;
final Lock tailLock = _tailLock; _tailLock.lock();
tailLock.lock();
try try
{ {
final Lock headLock = _headLock;
headLock.lock(); _headLock.lock();
try try
{ {
final int head = _indexes[HEAD_OFFSET]; final int head = _indexes[HEAD_OFFSET];
@ -797,12 +801,12 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
} }
finally finally
{ {
headLock.unlock(); _headLock.unlock();
} }
} }
finally finally
{ {
tailLock.unlock(); _tailLock.unlock();
} }
} }