Code cleanup plus optimizations plus javadocs improvements.

Following the JDK optimization, the lock fields are now read into local variables.
Optimized various reads of fields into local variables.
This commit is contained in:
Simone Bordet 2012-12-20 13:26:57 +01:00
parent 26c5eb428f
commit 3fb5af01f2
2 changed files with 355 additions and 368 deletions

View File

@ -21,24 +21,23 @@ package org.eclipse.jetty.util;
import java.util.AbstractList;
import java.util.Collection;
import java.util.NoSuchElementException;
import java.util.Objects;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/* ------------------------------------------------------------ */
/** Queue backed by a circular array.
*
* This queue is uses a variant of the two lock queue algorithm to
* provide an efficient queue or list backed by a growable circular
* array. This queue also has a partial implementation of
* {@link java.util.concurrent.BlockingQueue}, specifically the {@link #take()} and
* {@link #poll(long, TimeUnit)} methods.
/**
* A BlockingQueue backed by a circular array capable or growing and shrinking.
* <p/>
* This queue is uses a variant of the two lock queue algorithm to provide an
* efficient queue or list backed by a growable circular array.
* <p/>
* Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is
* able to grow and provides a blocking put call.
* <p>
* <p/>
* The queue has both a capacity (the size of the array currently allocated)
* and a limit (the maximum size that may be allocated), which defaults to
* {@link Integer#MAX_VALUE}.
@ -47,21 +46,25 @@ import java.util.concurrent.locks.ReentrantLock;
*/
public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
{
/**
* Default initial capacity, 128.
*/
public final int DEFAULT_CAPACITY = 128;
/**
* Default growth factor, 64.
*/
public final int DEFAULT_GROWTH = 64;
private final int _limit;
private final AtomicInteger _size = new AtomicInteger();
private final int _growCapacity;
private volatile int _capacity;
private Object[] _elements;
private final ReentrantLock _headLock = new ReentrantLock();
private final Lock _headLock = new ReentrantLock();
private final Condition _notEmpty = _headLock.newCondition();
private int _head;
// spacers created to prevent false sharing between head and tail http://en.wikipedia.org/wiki/False_sharing
// TODO verify this has benefits
// Spacers created to prevent false sharing between head and tail http://en.wikipedia.org/wiki/False_sharing
// TODO verify these spacers really prevent false sharing
private long _space0;
private long _space1;
private long _space2;
@ -70,14 +73,14 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
private long _space5;
private long _space6;
private long _space7;
private final ReentrantLock _tailLock = new ReentrantLock();
private final Lock _tailLock = new ReentrantLock();
private int _tail;
/* ------------------------------------------------------------ */
/** Create a growing partially blocking Queue
/**
* Creates an unbounded {@link BlockingArrayQueue} with default initial capacity and grow factor.
*
* @see #DEFAULT_CAPACITY
* @see #DEFAULT_GROWTH
*/
public BlockingArrayQueue()
{
@ -87,9 +90,11 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
_limit = Integer.MAX_VALUE;
}
/* ------------------------------------------------------------ */
/** Create a fixed size partially blocking Queue
* @param limit The initial capacity and the limit.
/**
* Creates a bounded {@link BlockingArrayQueue} that does not grow/shrink.
* The capacity of the queue is fixed and equal to the given parameter.
*
* @param limit the maximum capacity
*/
public BlockingArrayQueue(int limit)
{
@ -99,10 +104,11 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
_limit = limit;
}
/* ------------------------------------------------------------ */
/** Create a growing partially blocking Queue.
* @param capacity Initial capacity
* @param growBy Incremental capacity.
/**
* Creates an unbounded {@link BlockingArrayQueue} that grows/shrinks by the given parameter.
*
* @param capacity the initial capacity
* @param growBy the growth/shrink factor
*/
public BlockingArrayQueue(int capacity, int growBy)
{
@ -112,43 +118,48 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
_limit = Integer.MAX_VALUE;
}
/* ------------------------------------------------------------ */
/** Create a growing limited partially blocking Queue.
* @param capacity Initial capacity
* @param growBy Incremental capacity.
* @param limit maximum capacity.
/**
* Create a bounded {@link BlockingArrayQueue} that grows/shrinks by the given parameter.
*
* @param capacity the initial capacity
* @param growBy the growth/shrink factor
* @param limit the maximum capacity
*/
public BlockingArrayQueue(int capacity, int growBy, int limit)
{
if (capacity > limit)
throw new IllegalArgumentException();
_elements = new Object[capacity];
_capacity = _elements.length;
_growCapacity = growBy;
_limit = limit;
}
/* ------------------------------------------------------------ */
/**
* @return the current capacity of this queue
*/
public int getCapacity()
{
return _capacity;
}
/* ------------------------------------------------------------ */
/**
* @return the max capacity of this queue, or -1 if this queue is unbounded
*/
public int getLimit()
{
return _limit;
}
/* ------------------------------------------------------------ */
@Override
public boolean add(E e)
{
return offer(e);
if (offer(e))
return true;
else
throw new IllegalStateException();
}
/* ------------------------------------------------------------ */
public E element()
{
E e = peek();
@ -157,7 +168,6 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
return e;
}
/* ------------------------------------------------------------ */
@SuppressWarnings("unchecked")
public E peek()
{
@ -165,7 +175,8 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
return null;
E e = null;
_headLock.lock(); // Size cannot shrink
final Lock headLock = _headLock;
headLock.lock(); // Size cannot shrink
try
{
if (_size.get() > 0)
@ -173,29 +184,29 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
}
finally
{
_headLock.unlock();
headLock.unlock();
}
return e;
}
/* ------------------------------------------------------------ */
public boolean offer(E e)
{
if (e == null)
throw new NullPointerException();
Objects.requireNonNull(e);
boolean not_empty=false;
_tailLock.lock(); // size cannot grow... only shrink
final Lock tailLock = _tailLock;
final Lock headLock = _headLock;
boolean notEmpty = false;
tailLock.lock(); // Size cannot grow... only shrink
try
{
if (_size.get() >= _limit)
int size = _size.get();
if (size >= _limit)
return false;
// should we expand array?
if (_size.get()==_capacity)
// Should we expand array?
if (size == _capacity)
{
_headLock.lock(); // Need to grow array
headLock.lock();
try
{
if (!grow())
@ -203,40 +214,37 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
}
finally
{
_headLock.unlock();
headLock.unlock();
}
}
// add the element
_elements[_tail]=e;
_tail=(_tail+1)%_capacity;
not_empty=0==_size.getAndIncrement();
// Add the element
int tail = _tail;
_elements[tail] = e;
_tail = (tail + 1) % _capacity;
notEmpty = _size.getAndIncrement() == 0;
}
finally
{
_tailLock.unlock();
tailLock.unlock();
}
if (not_empty)
if (notEmpty)
{
_headLock.lock();
headLock.lock();
try
{
_notEmpty.signal();
}
finally
{
_headLock.unlock();
headLock.unlock();
}
}
return true;
}
/* ------------------------------------------------------------ */
@SuppressWarnings("unchecked")
public E poll()
{
@ -244,7 +252,8 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
return null;
E e = null;
_headLock.lock(); // Size cannot shrink
final Lock headLock = _headLock;
headLock.lock(); // Size cannot shrink
try
{
if (_size.get() > 0)
@ -253,31 +262,23 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
e = (E)_elements[head];
_elements[head] = null;
_head = (head + 1) % _capacity;
if (_size.decrementAndGet() > 0)
_notEmpty.signal();
}
}
finally
{
_headLock.unlock();
headLock.unlock();
}
return e;
}
/* ------------------------------------------------------------ */
/**
* Retrieves and removes the head of this queue, waiting
* if no elements are present on this queue.
* @return the head of this queue
* @throws InterruptedException if interrupted while waiting.
*/
@SuppressWarnings("unchecked")
public E take() throws InterruptedException
{
E e = null;
_headLock.lockInterruptibly(); // Size cannot shrink
final Lock headLock = _headLock;
headLock.lockInterruptibly(); // Size cannot shrink
try
{
try
@ -303,34 +304,18 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
}
finally
{
_headLock.unlock();
headLock.unlock();
}
return e;
}
/* ------------------------------------------------------------ */
/**
* Retrieves and removes the head of this queue, waiting
* if necessary up to the specified wait time if no elements are
* present on this queue.
* @param time how long to wait before giving up, in units of
* <tt>unit</tt>
* @param unit a <tt>TimeUnit</tt> determining how to interpret the
* <tt>timeout</tt> parameter
* @return the head of this queue, or <tt>null</tt> if the
* specified waiting time elapses before an element is present.
* @throws InterruptedException if interrupted while waiting.
*/
@SuppressWarnings("unchecked")
public E poll(long time, TimeUnit unit) throws InterruptedException
{
E e = null;
long nanos = unit.toNanos(time);
_headLock.lockInterruptibly(); // Size cannot shrink
E e = null;
final Lock headLock = _headLock;
headLock.lockInterruptibly(); // Size cannot shrink
try
{
try
@ -342,28 +327,27 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
nanos = _notEmpty.awaitNanos(nanos);
}
}
catch (InterruptedException ie)
catch (InterruptedException x)
{
_notEmpty.signal();
throw ie;
throw x;
}
e = (E)_elements[_head];
_elements[_head]=null;
_head=(_head+1)%_capacity;
int head = _head;
e = (E)_elements[head];
_elements[head] = null;
_head = (head + 1) % _capacity;
if (_size.decrementAndGet() > 0)
_notEmpty.signal();
}
finally
{
_headLock.unlock();
headLock.unlock();
}
return e;
}
/* ------------------------------------------------------------ */
public E remove()
{
E e = poll();
@ -372,14 +356,15 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
return e;
}
/* ------------------------------------------------------------ */
@Override
public void clear()
{
_tailLock.lock();
final Lock tailLock = _tailLock;
tailLock.lock();
try
{
_headLock.lock();
final Lock headLock = _headLock;
headLock.lock();
try
{
_head = 0;
@ -388,96 +373,93 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
}
finally
{
_headLock.unlock();
headLock.unlock();
}
}
finally
{
_tailLock.unlock();
tailLock.unlock();
}
}
/* ------------------------------------------------------------ */
@Override
public boolean isEmpty()
{
return _size.get()==0;
}
/* ------------------------------------------------------------ */
@Override
public int size()
{
return _size.get();
}
/* ------------------------------------------------------------ */
@SuppressWarnings("unchecked")
@Override
public E get(int index)
{
_tailLock.lock();
final Lock tailLock = _tailLock;
tailLock.lock();
try
{
_headLock.lock();
final Lock headLock = _headLock;
headLock.lock();
try
{
if (index < 0 || index >= _size.get())
throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
int i = _head + index;
if (i>=_capacity)
i-=_capacity;
int capacity = _capacity;
if (i >= capacity)
i -= capacity;
return (E)_elements[i];
}
finally
{
_headLock.unlock();
headLock.unlock();
}
}
finally
{
_tailLock.unlock();
tailLock.unlock();
}
}
/* ------------------------------------------------------------ */
@SuppressWarnings("unchecked")
@Override
public E remove(int index)
{
_tailLock.lock();
final Lock tailLock = _tailLock;
tailLock.lock();
try
{
_headLock.lock();
final Lock headLock = _headLock;
headLock.lock();
try
{
if (index < 0 || index >= _size.get())
throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
int i = _head + index;
if (i>=_capacity)
i-=_capacity;
@SuppressWarnings("unchecked")
int capacity = _capacity;
if (i >= capacity)
i -= capacity;
E old = (E)_elements[i];
if (i<_tail)
int tail = _tail;
if (i < tail)
{
System.arraycopy(_elements,i+1,_elements,i,_tail-i);
_tail--;
System.arraycopy(_elements, i + 1, _elements, i, tail - i);
--_tail;
_size.decrementAndGet();
}
else
{
System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1);
if (_tail>0)
System.arraycopy(_elements, i + 1, _elements, i, capacity - i - 1);
if (tail > 0)
{
_elements[_capacity]=_elements[0];
System.arraycopy(_elements,1,_elements,0,_tail-1);
_tail--;
_elements[capacity] = _elements[0];
System.arraycopy(_elements, 1, _elements, 0, tail - 1);
--_tail;
}
else
_tail=_capacity-1;
{
_tail = capacity - 1;
}
_size.decrementAndGet();
}
@ -485,69 +467,71 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
}
finally
{
_headLock.unlock();
headLock.unlock();
}
}
finally
{
_tailLock.unlock();
tailLock.unlock();
}
}
/* ------------------------------------------------------------ */
@SuppressWarnings("unchecked")
@Override
public E set(int index, E e)
{
if (e == null)
throw new NullPointerException();
Objects.requireNonNull(e);
_tailLock.lock();
final Lock tailLock = _tailLock;
tailLock.lock();
try
{
_headLock.lock();
final Lock headLock = _headLock;
headLock.lock();
try
{
if (index < 0 || index >= _size.get())
throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
int i = _head + index;
if (i>=_capacity)
i-=_capacity;
@SuppressWarnings("unchecked")
int capacity = _capacity;
if (i >= capacity)
i -= capacity;
E old = (E)_elements[i];
_elements[i] = e;
return old;
}
finally
{
_headLock.unlock();
headLock.unlock();
}
}
finally
{
_tailLock.unlock();
tailLock.unlock();
}
}
/* ------------------------------------------------------------ */
@Override
public void add(int index, E e)
{
if (e == null)
throw new NullPointerException();
_tailLock.lock();
final Lock tailLock = _tailLock;
tailLock.lock();
try
{
_headLock.lock();
final Lock headLock = _headLock;
headLock.lock();
try
{
final int size = _size.get();
if (index<0 || index>_size.get())
if (index < 0 || index > size)
throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")");
if (index==_size.get())
if (index == size)
{
add(e);
}
@ -558,144 +542,143 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
throw new IllegalStateException("full");
int i = _head + index;
if (i>=_capacity)
i-=_capacity;
int capacity = _capacity;
if (i >= capacity)
i -= capacity;
_size.incrementAndGet();
_tail=(_tail+1)%_capacity;
int tail = _tail;
_tail = tail = (tail + 1) % capacity;
if (i<_tail)
if (i < tail)
{
System.arraycopy(_elements,i,_elements,i+1,_tail-i);
System.arraycopy(_elements, i, _elements, i + 1, tail - i);
_elements[i] = e;
}
else
{
if (_tail>0)
if (tail > 0)
{
System.arraycopy(_elements,0,_elements,1,_tail);
_elements[0]=_elements[_capacity-1];
System.arraycopy(_elements, 0, _elements, 1, tail);
_elements[0] = _elements[capacity - 1];
}
System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1);
System.arraycopy(_elements, i, _elements, i + 1, capacity - i - 1);
_elements[i] = e;
}
}
}
finally
{
_headLock.unlock();
headLock.unlock();
}
}
finally
{
_tailLock.unlock();
tailLock.unlock();
}
}
/* ------------------------------------------------------------ */
private boolean grow()
{
if (_growCapacity <= 0)
return false;
_tailLock.lock();
final Lock tailLock = _tailLock;
tailLock.lock();
try
{
_headLock.lock();
final Lock headLock = _headLock;
headLock.lock();
try
{
final int head = _head;
final int tail = _tail;
final int new_tail;
final int newTail;
final int capacity = _capacity;
Object[] elements=new Object[_capacity+_growCapacity];
Object[] elements = new Object[capacity + _growCapacity];
if (head < tail)
{
new_tail=tail-head;
System.arraycopy(_elements,head,elements,0,new_tail);
newTail = tail - head;
System.arraycopy(_elements, head, elements, 0, newTail);
}
else if (head > tail || _size.get() > 0)
{
new_tail=_capacity+tail-head;
int cut=_capacity-head;
newTail = capacity + tail - head;
int cut = capacity - head;
System.arraycopy(_elements, head, elements, 0, cut);
System.arraycopy(_elements, 0, elements, cut, tail);
}
else
{
new_tail=0;
newTail = 0;
}
_elements = elements;
_capacity = _elements.length;
_head = 0;
_tail=new_tail;
_tail = newTail;
return true;
}
finally
{
_headLock.unlock();
headLock.unlock();
}
}
finally
{
_tailLock.unlock();
tailLock.unlock();
}
}
}
/* ------------------------------------------------------------ */
public int drainTo(Collection<? super E> c)
{
throw new UnsupportedOperationException();
}
/* ------------------------------------------------------------ */
public int drainTo(Collection<? super E> c, int maxElements)
{
throw new UnsupportedOperationException();
}
/* ------------------------------------------------------------ */
public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
{
throw new UnsupportedOperationException();
}
/* ------------------------------------------------------------ */
public void put(E o) throws InterruptedException
{
if (!add(o))
throw new IllegalStateException("full");
}
/* ------------------------------------------------------------ */
public int remainingCapacity()
{
_tailLock.lock();
final Lock tailLock = _tailLock;
tailLock.lock();
try
{
_headLock.lock();
final Lock headLock = _headLock;
headLock.lock();
try
{
return getCapacity() - size();
}
finally
{
_headLock.unlock();
headLock.unlock();
}
}
finally
{
_tailLock.unlock();
tailLock.unlock();
}
}
/* ------------------------------------------------------------ */
// TODO: verify this is not optimized away by the JIT
long sumOfSpace()
{
// this method exists to stop clever optimisers removing the spacers

View File

@ -18,71 +18,67 @@
package org.eclipse.jetty.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.util.HashSet;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
@RunWith(AdvancedRunner.class)
public class BlockingArrayQueueTest
{
@Test
public void testWrap() throws Exception
{
BlockingArrayQueue<String> queue = new BlockingArrayQueue<String>(3);
BlockingArrayQueue<String> queue = new BlockingArrayQueue<>(3);
assertEquals(0,queue.size());
Assert.assertEquals(0, queue.size());
for (int i=0;i<3;i++)
{
queue.offer("one");
assertEquals(1,queue.size());
Assert.assertEquals(1, queue.size());
queue.offer("two");
assertEquals(2,queue.size());
Assert.assertEquals(2, queue.size());
queue.offer("three");
assertEquals(3,queue.size());
Assert.assertEquals(3, queue.size());
assertEquals("one",queue.get(0));
assertEquals("two",queue.get(1));
assertEquals("three",queue.get(2));
Assert.assertEquals("one", queue.get(0));
Assert.assertEquals("two", queue.get(1));
Assert.assertEquals("three", queue.get(2));
assertEquals("[one, two, three]",queue.toString());
Assert.assertEquals("[one, two, three]", queue.toString());
assertEquals("one",queue.poll());
assertEquals(2,queue.size());
Assert.assertEquals("one", queue.poll());
Assert.assertEquals(2, queue.size());
assertEquals("two",queue.poll());
assertEquals(1,queue.size());
Assert.assertEquals("two", queue.poll());
Assert.assertEquals(1, queue.size());
assertEquals("three",queue.poll());
assertEquals(0,queue.size());
Assert.assertEquals("three", queue.poll());
Assert.assertEquals(0, queue.size());
queue.offer("xxx");
assertEquals(1,queue.size());
assertEquals("xxx",queue.poll());
assertEquals(0,queue.size());
Assert.assertEquals(1, queue.size());
Assert.assertEquals("xxx", queue.poll());
Assert.assertEquals(0, queue.size());
}
}
@Test
public void testRemove() throws Exception
{
BlockingArrayQueue<String> queue = new BlockingArrayQueue<String>(3,3);
BlockingArrayQueue<String> queue = new BlockingArrayQueue<>(3,3);
queue.add("0");
queue.add("x");
@ -96,23 +92,36 @@ public class BlockingArrayQueueTest
}
for (int i=0;i<99;i++)
assertEquals(i+"!",queue.get(i));
Assert.assertEquals(i + "!", queue.get(i));
}
@Test
public void testLimit() throws Exception
{
BlockingArrayQueue<String> queue = new BlockingArrayQueue<>(1,0,1);
String element = "0";
Assert.assertTrue(queue.add(element));
Assert.assertFalse(queue.offer("1"));
Assert.assertEquals(element, queue.poll());
Assert.assertTrue(queue.add(element));
}
@Test
public void testGrow() throws Exception
{
BlockingArrayQueue<String> queue = new BlockingArrayQueue<String>(3,2);
assertEquals(3,queue.getCapacity());
BlockingArrayQueue<String> queue = new BlockingArrayQueue<>(3,2);
Assert.assertEquals(3, queue.getCapacity());
queue.add("a");
queue.add("a");
assertEquals(2,queue.size());
assertEquals(3,queue.getCapacity());
Assert.assertEquals(2, queue.size());
Assert.assertEquals(3, queue.getCapacity());
queue.add("a");
queue.add("a");
assertEquals(4,queue.size());
assertEquals(5,queue.getCapacity());
Assert.assertEquals(4, queue.size());
Assert.assertEquals(5, queue.getCapacity());
int s=5;
int c=5;
@ -120,25 +129,25 @@ public class BlockingArrayQueueTest
for (int t=0;t<100;t++)
{
assertEquals(s,queue.size());
assertEquals(c,queue.getCapacity());
Assert.assertEquals(s, queue.size());
Assert.assertEquals(c, queue.getCapacity());
for (int i=queue.size();i-->0;)
queue.poll();
assertEquals(0,queue.size());
assertEquals(c,queue.getCapacity());
Assert.assertEquals(0, queue.size());
Assert.assertEquals(c, queue.getCapacity());
for (int i=queue.getCapacity();i-->0;)
queue.add("a");
queue.add("a");
assertEquals(s+1,queue.size());
assertEquals(c+2,queue.getCapacity());
Assert.assertEquals(s + 1, queue.size());
Assert.assertEquals(c + 2, queue.getCapacity());
queue.poll();
queue.add("a");
queue.add("a");
assertEquals(s+2,queue.size());
assertEquals(c+2,queue.getCapacity());
Assert.assertEquals(s + 2, queue.size());
Assert.assertEquals(c + 2, queue.getCapacity());
s+=2;
c+=2;
@ -151,7 +160,7 @@ public class BlockingArrayQueueTest
{
final String[] data=new String[4];
final BlockingArrayQueue<String> queue = new BlockingArrayQueue<String>();
final BlockingArrayQueue<String> queue = new BlockingArrayQueue<>();
Thread thread = new Thread()
{
@ -168,8 +177,8 @@ public class BlockingArrayQueueTest
}
catch(Exception e)
{
assertTrue(false);
e.printStackTrace();
Assert.fail();
}
}
};
@ -183,15 +192,12 @@ public class BlockingArrayQueueTest
queue.offer("two");
thread.join();
assertEquals("zero",data[0]);
assertEquals("one",data[1]);
assertEquals("two",data[2]);
assertEquals(null,data[3]);
Assert.assertEquals("zero", data[0]);
Assert.assertEquals("one", data[1]);
Assert.assertEquals("two", data[2]);
Assert.assertEquals(null, data[3]);
}
volatile boolean _running;
@Test
@Slow
public void testConcurrentAccess() throws Exception
@ -199,19 +205,17 @@ public class BlockingArrayQueueTest
final int THREADS=50;
final int LOOPS=1000;
final BlockingArrayQueue<Integer> queue = new BlockingArrayQueue<Integer>(1+THREADS*LOOPS);
final BlockingArrayQueue<Integer> queue = new BlockingArrayQueue<>(1+THREADS*LOOPS);
final ConcurrentLinkedQueue<Integer> produced=new ConcurrentLinkedQueue<Integer>();
final ConcurrentLinkedQueue<Integer> consumed=new ConcurrentLinkedQueue<Integer>();
final ConcurrentLinkedQueue<Integer> produced=new ConcurrentLinkedQueue<>();
final ConcurrentLinkedQueue<Integer> consumed=new ConcurrentLinkedQueue<>();
_running=true;
final AtomicBoolean running = new AtomicBoolean(true);
// start consumers
final CyclicBarrier barrier0 = new CyclicBarrier(THREADS+1);
for (int i=0;i<THREADS;i++)
{
final Integer id = new Integer(i);
new Thread()
{
@Override
@ -222,7 +226,7 @@ public class BlockingArrayQueueTest
setPriority(getPriority()-1);
try
{
while(_running)
while(running.get())
{
int r=1+random.nextInt(10);
if (r%2==0)
@ -267,7 +271,7 @@ public class BlockingArrayQueueTest
final CyclicBarrier barrier1 = new CyclicBarrier(THREADS+1);
for (int i=0;i<THREADS;i++)
{
final Integer id = new Integer(i);
final int id = i;
new Thread()
{
@Override
@ -278,7 +282,7 @@ public class BlockingArrayQueueTest
{
for (int j=0;j<LOOPS;j++)
{
Integer msg = new Integer(random.nextInt());
Integer msg = random.nextInt();
produced.add(msg);
if (!queue.offer(msg))
throw new Exception(id+" FULL! "+queue.size());
@ -313,12 +317,12 @@ public class BlockingArrayQueueTest
Thread.sleep(500);
size=queue.size();
}
_running=false;
running.set(false);
barrier0.await();
HashSet<Integer> prodSet = new HashSet<Integer>(produced);
HashSet<Integer> consSet = new HashSet<Integer>(consumed);
HashSet<Integer> prodSet = new HashSet<>(produced);
HashSet<Integer> consSet = new HashSet<>(consumed);
assertEquals(prodSet,consSet);
Assert.assertEquals(prodSet, consSet);
}
}