diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingArrayQueue.java b/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingArrayQueue.java index bc436770572..c050059fc94 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingArrayQueue.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/BlockingArrayQueue.java @@ -21,47 +21,50 @@ package org.eclipse.jetty.util; import java.util.AbstractList; import java.util.Collection; import java.util.NoSuchElementException; +import java.util.Objects; import java.util.concurrent.BlockingQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; - -/* ------------------------------------------------------------ */ -/** Queue backed by a circular array. - * - * This queue is uses a variant of the two lock queue algorithm to - * provide an efficient queue or list backed by a growable circular - * array. This queue also has a partial implementation of - * {@link java.util.concurrent.BlockingQueue}, specifically the {@link #take()} and - * {@link #poll(long, TimeUnit)} methods. +/** + * A BlockingQueue backed by a circular array capable or growing and shrinking. + *

+ * This queue is uses a variant of the two lock queue algorithm to provide an + * efficient queue or list backed by a growable circular array. + *

* Unlike {@link java.util.concurrent.ArrayBlockingQueue}, this class is * able to grow and provides a blocking put call. - *

+ *

* The queue has both a capacity (the size of the array currently allocated) - * and a limit (the maximum size that may be allocated), which defaults to + * and a limit (the maximum size that may be allocated), which defaults to * {@link Integer#MAX_VALUE}. - * + * * @param The element type */ public class BlockingArrayQueue extends AbstractList implements BlockingQueue { - public final int DEFAULT_CAPACITY=128; - public final int DEFAULT_GROWTH=64; + /** + * Default initial capacity, 128. + */ + public final int DEFAULT_CAPACITY = 128; + /** + * Default growth factor, 64. + */ + public final int DEFAULT_GROWTH = 64; + private final int _limit; - private final AtomicInteger _size=new AtomicInteger(); + private final AtomicInteger _size = new AtomicInteger(); private final int _growCapacity; - private volatile int _capacity; private Object[] _elements; - - private final ReentrantLock _headLock = new ReentrantLock(); + private final Lock _headLock = new ReentrantLock(); private final Condition _notEmpty = _headLock.newCondition(); private int _head; - - // spacers created to prevent false sharing between head and tail http://en.wikipedia.org/wiki/False_sharing - // TODO verify this has benefits + // Spacers created to prevent false sharing between head and tail http://en.wikipedia.org/wiki/False_sharing + // TODO verify these spacers really prevent false sharing private long _space0; private long _space1; private long _space2; @@ -70,132 +73,140 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu private long _space5; private long _space6; private long _space7; - - private final ReentrantLock _tailLock = new ReentrantLock(); + private final Lock _tailLock = new ReentrantLock(); private int _tail; - - /* ------------------------------------------------------------ */ - /** Create a growing partially blocking Queue - * + /** + * Creates an unbounded {@link BlockingArrayQueue} with default initial capacity and grow factor. + * + * @see #DEFAULT_CAPACITY + * @see #DEFAULT_GROWTH */ public BlockingArrayQueue() { - _elements=new Object[DEFAULT_CAPACITY]; - _growCapacity=DEFAULT_GROWTH; - _capacity=_elements.length; - _limit=Integer.MAX_VALUE; + _elements = new Object[DEFAULT_CAPACITY]; + _growCapacity = DEFAULT_GROWTH; + _capacity = _elements.length; + _limit = Integer.MAX_VALUE; } - /* ------------------------------------------------------------ */ - /** Create a fixed size partially blocking Queue - * @param limit The initial capacity and the limit. + /** + * Creates a bounded {@link BlockingArrayQueue} that does not grow/shrink. + * The capacity of the queue is fixed and equal to the given parameter. + * + * @param limit the maximum capacity */ public BlockingArrayQueue(int limit) { - _elements=new Object[limit]; - _capacity=_elements.length; - _growCapacity=-1; - _limit=limit; + _elements = new Object[limit]; + _capacity = _elements.length; + _growCapacity = -1; + _limit = limit; } - /* ------------------------------------------------------------ */ - /** Create a growing partially blocking Queue. - * @param capacity Initial capacity - * @param growBy Incremental capacity. + /** + * Creates an unbounded {@link BlockingArrayQueue} that grows/shrinks by the given parameter. + * + * @param capacity the initial capacity + * @param growBy the growth/shrink factor */ - public BlockingArrayQueue(int capacity,int growBy) + public BlockingArrayQueue(int capacity, int growBy) { - _elements=new Object[capacity]; - _capacity=_elements.length; - _growCapacity=growBy; - _limit=Integer.MAX_VALUE; + _elements = new Object[capacity]; + _capacity = _elements.length; + _growCapacity = growBy; + _limit = Integer.MAX_VALUE; } - /* ------------------------------------------------------------ */ - /** Create a growing limited partially blocking Queue. - * @param capacity Initial capacity - * @param growBy Incremental capacity. - * @param limit maximum capacity. + /** + * Create a bounded {@link BlockingArrayQueue} that grows/shrinks by the given parameter. + * + * @param capacity the initial capacity + * @param growBy the growth/shrink factor + * @param limit the maximum capacity */ - public BlockingArrayQueue(int capacity,int growBy,int limit) + public BlockingArrayQueue(int capacity, int growBy, int limit) { - if (capacity>limit) + if (capacity > limit) throw new IllegalArgumentException(); - - _elements=new Object[capacity]; - _capacity=_elements.length; - _growCapacity=growBy; - _limit=limit; + _elements = new Object[capacity]; + _capacity = _elements.length; + _growCapacity = growBy; + _limit = limit; } - /* ------------------------------------------------------------ */ + /** + * @return the current capacity of this queue + */ public int getCapacity() { return _capacity; } - /* ------------------------------------------------------------ */ + /** + * @return the max capacity of this queue, or -1 if this queue is unbounded + */ public int getLimit() { return _limit; } - - /* ------------------------------------------------------------ */ + @Override public boolean add(E e) { - return offer(e); + if (offer(e)) + return true; + else + throw new IllegalStateException(); } - - /* ------------------------------------------------------------ */ + public E element() { E e = peek(); - if (e==null) + if (e == null) throw new NoSuchElementException(); return e; } - - /* ------------------------------------------------------------ */ + @SuppressWarnings("unchecked") public E peek() { if (_size.get() == 0) return null; - + E e = null; - _headLock.lock(); // Size cannot shrink - try + final Lock headLock = _headLock; + headLock.lock(); // Size cannot shrink + try { - if (_size.get() > 0) + if (_size.get() > 0) e = (E)_elements[_head]; - } - finally - { - _headLock.unlock(); } - + finally + { + headLock.unlock(); + } return e; } - /* ------------------------------------------------------------ */ public boolean offer(E e) { - if (e == null) - throw new NullPointerException(); - - boolean not_empty=false; - _tailLock.lock(); // size cannot grow... only shrink - try + Objects.requireNonNull(e); + + final Lock tailLock = _tailLock; + final Lock headLock = _headLock; + boolean notEmpty = false; + tailLock.lock(); // Size cannot grow... only shrink + try { - if (_size.get() >= _limit) + int size = _size.get(); + if (size >= _limit) return false; - - // should we expand array? - if (_size.get()==_capacity) + + // Should we expand array? + if (size == _capacity) { - _headLock.lock(); // Need to grow array + headLock.lock(); try { if (!grow()) @@ -203,281 +214,252 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu } finally { - _headLock.unlock(); + headLock.unlock(); } } - // add the element - _elements[_tail]=e; - _tail=(_tail+1)%_capacity; - - not_empty=0==_size.getAndIncrement(); - - } - finally - { - _tailLock.unlock(); + // Add the element + int tail = _tail; + _elements[tail] = e; + _tail = (tail + 1) % _capacity; + notEmpty = _size.getAndIncrement() == 0; } - - if (not_empty) + finally { - _headLock.lock(); + tailLock.unlock(); + } + + if (notEmpty) + { + headLock.lock(); try { _notEmpty.signal(); } finally { - _headLock.unlock(); + headLock.unlock(); } - } + } return true; } - - /* ------------------------------------------------------------ */ @SuppressWarnings("unchecked") public E poll() { if (_size.get() == 0) return null; - + E e = null; - _headLock.lock(); // Size cannot shrink - try + final Lock headLock = _headLock; + headLock.lock(); // Size cannot shrink + try { - if (_size.get() > 0) + if (_size.get() > 0) { - final int head=_head; + final int head = _head; e = (E)_elements[head]; - _elements[head]=null; - _head=(head+1)%_capacity; - - if (_size.decrementAndGet()>0) + _elements[head] = null; + _head = (head + 1) % _capacity; + if (_size.decrementAndGet() > 0) _notEmpty.signal(); } - } - finally - { - _headLock.unlock(); } - + finally + { + headLock.unlock(); + } return e; } - /* ------------------------------------------------------------ */ - /** - * Retrieves and removes the head of this queue, waiting - * if no elements are present on this queue. - * @return the head of this queue - * @throws InterruptedException if interrupted while waiting. - */ @SuppressWarnings("unchecked") public E take() throws InterruptedException { E e = null; - _headLock.lockInterruptibly(); // Size cannot shrink - try + final Lock headLock = _headLock; + headLock.lockInterruptibly(); // Size cannot shrink + try { - try + try { while (_size.get() == 0) { _notEmpty.await(); } - } - catch (InterruptedException ie) + } + catch (InterruptedException ie) { _notEmpty.signal(); throw ie; } - final int head=_head; + final int head = _head; e = (E)_elements[head]; - _elements[head]=null; - _head=(head+1)%_capacity; + _elements[head] = null; + _head = (head + 1) % _capacity; - if (_size.decrementAndGet()>0) + if (_size.decrementAndGet() > 0) _notEmpty.signal(); - } - finally - { - _headLock.unlock(); } - + finally + { + headLock.unlock(); + } return e; } - /* ------------------------------------------------------------ */ - /** - * Retrieves and removes the head of this queue, waiting - * if necessary up to the specified wait time if no elements are - * present on this queue. - * @param time how long to wait before giving up, in units of - * unit - * @param unit a TimeUnit determining how to interpret the - * timeout parameter - * @return the head of this queue, or null if the - * specified waiting time elapses before an element is present. - * @throws InterruptedException if interrupted while waiting. - */ @SuppressWarnings("unchecked") public E poll(long time, TimeUnit unit) throws InterruptedException { - - E e = null; - long nanos = unit.toNanos(time); - - _headLock.lockInterruptibly(); // Size cannot shrink - try - { - try + E e = null; + final Lock headLock = _headLock; + headLock.lockInterruptibly(); // Size cannot shrink + try + { + try { while (_size.get() == 0) { - if (nanos<=0) + if (nanos <= 0) return null; nanos = _notEmpty.awaitNanos(nanos); } - } - catch (InterruptedException ie) + } + catch (InterruptedException x) { _notEmpty.signal(); - throw ie; + throw x; } - e = (E)_elements[_head]; - _elements[_head]=null; - _head=(_head+1)%_capacity; + int head = _head; + e = (E)_elements[head]; + _elements[head] = null; + _head = (head + 1) % _capacity; - if (_size.decrementAndGet()>0) + if (_size.decrementAndGet() > 0) _notEmpty.signal(); - } - finally - { - _headLock.unlock(); } - + finally + { + headLock.unlock(); + } return e; } - /* ------------------------------------------------------------ */ public E remove() { - E e=poll(); - if (e==null) + E e = poll(); + if (e == null) throw new NoSuchElementException(); return e; } - /* ------------------------------------------------------------ */ @Override public void clear() { - _tailLock.lock(); + final Lock tailLock = _tailLock; + tailLock.lock(); try { - _headLock.lock(); + final Lock headLock = _headLock; + headLock.lock(); try { - _head=0; - _tail=0; + _head = 0; + _tail = 0; _size.set(0); } finally { - _headLock.unlock(); + headLock.unlock(); } } finally { - _tailLock.unlock(); + tailLock.unlock(); } } - /* ------------------------------------------------------------ */ - @Override - public boolean isEmpty() - { - return _size.get()==0; - } - - /* ------------------------------------------------------------ */ @Override public int size() { return _size.get(); } - /* ------------------------------------------------------------ */ @SuppressWarnings("unchecked") @Override public E get(int index) { - _tailLock.lock(); + final Lock tailLock = _tailLock; + tailLock.lock(); try { - _headLock.lock(); + final Lock headLock = _headLock; + headLock.lock(); try { - if (index<0 || index>=_size.get()) - throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); - int i = _head+index; - if (i>=_capacity) - i-=_capacity; + if (index < 0 || index >= _size.get()) + throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")"); + int i = _head + index; + int capacity = _capacity; + if (i >= capacity) + i -= capacity; return (E)_elements[i]; } finally { - _headLock.unlock(); + headLock.unlock(); } } finally { - _tailLock.unlock(); + tailLock.unlock(); } } - - /* ------------------------------------------------------------ */ + + @SuppressWarnings("unchecked") @Override public E remove(int index) { - _tailLock.lock(); + final Lock tailLock = _tailLock; + tailLock.lock(); try { - _headLock.lock(); + final Lock headLock = _headLock; + headLock.lock(); try { + if (index < 0 || index >= _size.get()) + throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")"); - if (index<0 || index>=_size.get()) - throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); + int i = _head + index; + int capacity = _capacity; + if (i >= capacity) + i -= capacity; + E old = (E)_elements[i]; - int i = _head+index; - if (i>=_capacity) - i-=_capacity; - @SuppressWarnings("unchecked") - E old=(E)_elements[i]; - - if (i<_tail) + int tail = _tail; + if (i < tail) { - System.arraycopy(_elements,i+1,_elements,i,_tail-i); - _tail--; + System.arraycopy(_elements, i + 1, _elements, i, tail - i); + --_tail; _size.decrementAndGet(); } else { - System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1); - if (_tail>0) + System.arraycopy(_elements, i + 1, _elements, i, capacity - i - 1); + if (tail > 0) { - _elements[_capacity]=_elements[0]; - System.arraycopy(_elements,1,_elements,0,_tail-1); - _tail--; + _elements[capacity] = _elements[0]; + System.arraycopy(_elements, 1, _elements, 0, tail - 1); + --_tail; } else - _tail=_capacity-1; - + { + _tail = capacity - 1; + } _size.decrementAndGet(); } @@ -485,220 +467,221 @@ public class BlockingArrayQueue extends AbstractList implements BlockingQu } finally { - _headLock.unlock(); + headLock.unlock(); } } finally { - _tailLock.unlock(); + tailLock.unlock(); } } - /* ------------------------------------------------------------ */ + @SuppressWarnings("unchecked") @Override public E set(int index, E e) { - if (e == null) - throw new NullPointerException(); + Objects.requireNonNull(e); - _tailLock.lock(); + final Lock tailLock = _tailLock; + tailLock.lock(); try { - _headLock.lock(); + final Lock headLock = _headLock; + headLock.lock(); try { + if (index < 0 || index >= _size.get()) + throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")"); - if (index<0 || index>=_size.get()) - throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); - - int i = _head+index; - if (i>=_capacity) - i-=_capacity; - @SuppressWarnings("unchecked") - E old=(E)_elements[i]; - _elements[i]=e; + int i = _head + index; + int capacity = _capacity; + if (i >= capacity) + i -= capacity; + E old = (E)_elements[i]; + _elements[i] = e; return old; } finally { - _headLock.unlock(); + headLock.unlock(); } } finally { - _tailLock.unlock(); + tailLock.unlock(); } } - - /* ------------------------------------------------------------ */ + @Override public void add(int index, E e) { - if (e == null) + if (e == null) throw new NullPointerException(); - _tailLock.lock(); + final Lock tailLock = _tailLock; + tailLock.lock(); try { - _headLock.lock(); + final Lock headLock = _headLock; + headLock.lock(); try { + final int size = _size.get(); - if (index<0 || index>_size.get()) - throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); + if (index < 0 || index > size) + throw new IndexOutOfBoundsException("!(" + 0 + "<" + index + "<=" + _size + ")"); - if (index==_size.get()) + if (index == size) { add(e); } else { - if (_tail==_head) + if (_tail == _head) if (!grow()) throw new IllegalStateException("full"); - int i = _head+index; - if (i>=_capacity) - i-=_capacity; + int i = _head + index; + int capacity = _capacity; + + if (i >= capacity) + i -= capacity; _size.incrementAndGet(); - _tail=(_tail+1)%_capacity; + int tail = _tail; + _tail = tail = (tail + 1) % capacity; - - if (i<_tail) + if (i < tail) { - System.arraycopy(_elements,i,_elements,i+1,_tail-i); - _elements[i]=e; + System.arraycopy(_elements, i, _elements, i + 1, tail - i); + _elements[i] = e; } else { - if (_tail>0) + if (tail > 0) { - System.arraycopy(_elements,0,_elements,1,_tail); - _elements[0]=_elements[_capacity-1]; + System.arraycopy(_elements, 0, _elements, 1, tail); + _elements[0] = _elements[capacity - 1]; } - System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1); - _elements[i]=e; + System.arraycopy(_elements, i, _elements, i + 1, capacity - i - 1); + _elements[i] = e; } } } finally { - _headLock.unlock(); + headLock.unlock(); } } finally { - _tailLock.unlock(); + tailLock.unlock(); } } - /* ------------------------------------------------------------ */ private boolean grow() { - if (_growCapacity<=0) + if (_growCapacity <= 0) return false; - _tailLock.lock(); + final Lock tailLock = _tailLock; + tailLock.lock(); try { - _headLock.lock(); + final Lock headLock = _headLock; + headLock.lock(); try { - final int head=_head; - final int tail=_tail; - final int new_tail; + final int head = _head; + final int tail = _tail; + final int newTail; + final int capacity = _capacity; - Object[] elements=new Object[_capacity+_growCapacity]; + Object[] elements = new Object[capacity + _growCapacity]; - if (headtail || _size.get()>0) + else if (head > tail || _size.get() > 0) { - new_tail=_capacity+tail-head; - int cut=_capacity-head; - System.arraycopy(_elements,head,elements,0,cut); - System.arraycopy(_elements,0,elements,cut,tail); + newTail = capacity + tail - head; + int cut = capacity - head; + System.arraycopy(_elements, head, elements, 0, cut); + System.arraycopy(_elements, 0, elements, cut, tail); } else { - new_tail=0; + newTail = 0; } - _elements=elements; - _capacity=_elements.length; - _head=0; - _tail=new_tail; + _elements = elements; + _capacity = _elements.length; + _head = 0; + _tail = newTail; return true; } finally { - _headLock.unlock(); + headLock.unlock(); } } finally { - _tailLock.unlock(); + tailLock.unlock(); } - } - /* ------------------------------------------------------------ */ public int drainTo(Collection c) { throw new UnsupportedOperationException(); } - /* ------------------------------------------------------------ */ public int drainTo(Collection c, int maxElements) { throw new UnsupportedOperationException(); } - /* ------------------------------------------------------------ */ public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException { throw new UnsupportedOperationException(); } - /* ------------------------------------------------------------ */ public void put(E o) throws InterruptedException { if (!add(o)) throw new IllegalStateException("full"); } - /* ------------------------------------------------------------ */ public int remainingCapacity() { - _tailLock.lock(); + final Lock tailLock = _tailLock; + tailLock.lock(); try { - _headLock.lock(); + final Lock headLock = _headLock; + headLock.lock(); try { - return getCapacity()-size(); + return getCapacity() - size(); } finally { - _headLock.unlock(); + headLock.unlock(); } } finally { - _tailLock.unlock(); + tailLock.unlock(); } } - - /* ------------------------------------------------------------ */ + // TODO: verify this is not optimized away by the JIT long sumOfSpace() { // this method exists to stop clever optimisers removing the spacers - return _space0++ +_space1++ +_space2++ +_space3++ +_space4++ +_space5++ +_space6++ +_space7++; + return _space0++ + _space1++ + _space2++ + _space3++ + _space4++ + _space5++ + _space6++ + _space7++; } } diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java index 5b3a27c4297..0ea273f50bd 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/BlockingArrayQueueTest.java @@ -18,71 +18,67 @@ package org.eclipse.jetty.util; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - import java.util.HashSet; import java.util.Random; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import org.eclipse.jetty.toolchain.test.AdvancedRunner; import org.eclipse.jetty.toolchain.test.annotation.Slow; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; @RunWith(AdvancedRunner.class) public class BlockingArrayQueueTest { - @Test public void testWrap() throws Exception { - BlockingArrayQueue queue = new BlockingArrayQueue(3); + BlockingArrayQueue queue = new BlockingArrayQueue<>(3); - assertEquals(0,queue.size()); + Assert.assertEquals(0, queue.size()); for (int i=0;i<3;i++) { queue.offer("one"); - assertEquals(1,queue.size()); + Assert.assertEquals(1, queue.size()); queue.offer("two"); - assertEquals(2,queue.size()); + Assert.assertEquals(2, queue.size()); queue.offer("three"); - assertEquals(3,queue.size()); + Assert.assertEquals(3, queue.size()); - assertEquals("one",queue.get(0)); - assertEquals("two",queue.get(1)); - assertEquals("three",queue.get(2)); + Assert.assertEquals("one", queue.get(0)); + Assert.assertEquals("two", queue.get(1)); + Assert.assertEquals("three", queue.get(2)); - assertEquals("[one, two, three]",queue.toString()); + Assert.assertEquals("[one, two, three]", queue.toString()); - assertEquals("one",queue.poll()); - assertEquals(2,queue.size()); + Assert.assertEquals("one", queue.poll()); + Assert.assertEquals(2, queue.size()); - assertEquals("two",queue.poll()); - assertEquals(1,queue.size()); + Assert.assertEquals("two", queue.poll()); + Assert.assertEquals(1, queue.size()); - assertEquals("three",queue.poll()); - assertEquals(0,queue.size()); + Assert.assertEquals("three", queue.poll()); + Assert.assertEquals(0, queue.size()); queue.offer("xxx"); - assertEquals(1,queue.size()); - assertEquals("xxx",queue.poll()); - assertEquals(0,queue.size()); - + Assert.assertEquals(1, queue.size()); + Assert.assertEquals("xxx", queue.poll()); + Assert.assertEquals(0, queue.size()); } - } @Test public void testRemove() throws Exception { - BlockingArrayQueue queue = new BlockingArrayQueue(3,3); + BlockingArrayQueue queue = new BlockingArrayQueue<>(3,3); queue.add("0"); queue.add("x"); @@ -96,23 +92,36 @@ public class BlockingArrayQueueTest } for (int i=0;i<99;i++) - assertEquals(i+"!",queue.get(i)); + Assert.assertEquals(i + "!", queue.get(i)); + } + + @Test + public void testLimit() throws Exception + { + BlockingArrayQueue queue = new BlockingArrayQueue<>(1,0,1); + + String element = "0"; + Assert.assertTrue(queue.add(element)); + Assert.assertFalse(queue.offer("1")); + + Assert.assertEquals(element, queue.poll()); + Assert.assertTrue(queue.add(element)); } @Test public void testGrow() throws Exception { - BlockingArrayQueue queue = new BlockingArrayQueue(3,2); - assertEquals(3,queue.getCapacity()); + BlockingArrayQueue queue = new BlockingArrayQueue<>(3,2); + Assert.assertEquals(3, queue.getCapacity()); queue.add("a"); queue.add("a"); - assertEquals(2,queue.size()); - assertEquals(3,queue.getCapacity()); + Assert.assertEquals(2, queue.size()); + Assert.assertEquals(3, queue.getCapacity()); queue.add("a"); queue.add("a"); - assertEquals(4,queue.size()); - assertEquals(5,queue.getCapacity()); + Assert.assertEquals(4, queue.size()); + Assert.assertEquals(5, queue.getCapacity()); int s=5; int c=5; @@ -120,25 +129,25 @@ public class BlockingArrayQueueTest for (int t=0;t<100;t++) { - assertEquals(s,queue.size()); - assertEquals(c,queue.getCapacity()); + Assert.assertEquals(s, queue.size()); + Assert.assertEquals(c, queue.getCapacity()); for (int i=queue.size();i-->0;) queue.poll(); - assertEquals(0,queue.size()); - assertEquals(c,queue.getCapacity()); + Assert.assertEquals(0, queue.size()); + Assert.assertEquals(c, queue.getCapacity()); for (int i=queue.getCapacity();i-->0;) queue.add("a"); queue.add("a"); - assertEquals(s+1,queue.size()); - assertEquals(c+2,queue.getCapacity()); + Assert.assertEquals(s + 1, queue.size()); + Assert.assertEquals(c + 2, queue.getCapacity()); queue.poll(); queue.add("a"); queue.add("a"); - assertEquals(s+2,queue.size()); - assertEquals(c+2,queue.getCapacity()); + Assert.assertEquals(s + 2, queue.size()); + Assert.assertEquals(c + 2, queue.getCapacity()); s+=2; c+=2; @@ -151,7 +160,7 @@ public class BlockingArrayQueueTest { final String[] data=new String[4]; - final BlockingArrayQueue queue = new BlockingArrayQueue(); + final BlockingArrayQueue queue = new BlockingArrayQueue<>(); Thread thread = new Thread() { @@ -168,8 +177,8 @@ public class BlockingArrayQueueTest } catch(Exception e) { - assertTrue(false); e.printStackTrace(); + Assert.fail(); } } }; @@ -183,15 +192,12 @@ public class BlockingArrayQueueTest queue.offer("two"); thread.join(); - assertEquals("zero",data[0]); - assertEquals("one",data[1]); - assertEquals("two",data[2]); - assertEquals(null,data[3]); - + Assert.assertEquals("zero", data[0]); + Assert.assertEquals("one", data[1]); + Assert.assertEquals("two", data[2]); + Assert.assertEquals(null, data[3]); } - volatile boolean _running; - @Test @Slow public void testConcurrentAccess() throws Exception @@ -199,19 +205,17 @@ public class BlockingArrayQueueTest final int THREADS=50; final int LOOPS=1000; - final BlockingArrayQueue queue = new BlockingArrayQueue(1+THREADS*LOOPS); + final BlockingArrayQueue queue = new BlockingArrayQueue<>(1+THREADS*LOOPS); - final ConcurrentLinkedQueue produced=new ConcurrentLinkedQueue(); - final ConcurrentLinkedQueue consumed=new ConcurrentLinkedQueue(); + final ConcurrentLinkedQueue produced=new ConcurrentLinkedQueue<>(); + final ConcurrentLinkedQueue consumed=new ConcurrentLinkedQueue<>(); - - _running=true; + final AtomicBoolean running = new AtomicBoolean(true); // start consumers final CyclicBarrier barrier0 = new CyclicBarrier(THREADS+1); for (int i=0;i prodSet = new HashSet(produced); - HashSet consSet = new HashSet(consumed); + HashSet prodSet = new HashSet<>(produced); + HashSet consSet = new HashSet<>(consumed); - assertEquals(prodSet,consSet); + Assert.assertEquals(prodSet, consSet); } }