explicit capacity handling

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@417 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Greg Wilkins 2009-06-18 00:49:52 +00:00
parent 370604fe76
commit 393aa1556c
1 changed files with 139 additions and 67 deletions

View File

@ -14,13 +14,15 @@
package org.eclipse.jetty.util; package org.eclipse.jetty.util;
import java.util.AbstractList; import java.util.AbstractList;
import java.util.Collection;
import java.util.NoSuchElementException; import java.util.NoSuchElementException;
import java.util.Queue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Queue backed by a circular array. /** Queue backed by a circular array.
* *
@ -38,14 +40,15 @@ import java.util.concurrent.locks.ReentrantLock;
* *
* @param <E> The element type * @param <E> The element type
*/ */
public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E> public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQueue<E>
{ {
public final int DEFAULT_CAPACITY=64; public final int DEFAULT_CAPACITY=128;
public final int DEFAULT_GROWTH=32; public final int DEFAULT_GROWTH=64;
private final int _limit; private final int _limit;
private final AtomicInteger _size=new AtomicInteger(); private final AtomicInteger _size=new AtomicInteger();
private final int _growCapacity; private final int _growCapacity;
private volatile int _capacity;
private Object[] _elements; private Object[] _elements;
private int _head; private int _head;
private int _tail; private int _tail;
@ -60,8 +63,9 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
*/ */
public BlockingArrayQueue() public BlockingArrayQueue()
{ {
_elements=new Object[64]; _elements=new Object[DEFAULT_CAPACITY];
_growCapacity=32; _growCapacity=DEFAULT_GROWTH;
_capacity=_elements.length;
_limit=Integer.MAX_VALUE; _limit=Integer.MAX_VALUE;
} }
@ -72,6 +76,7 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
public BlockingArrayQueue(int limit) public BlockingArrayQueue(int limit)
{ {
_elements=new Object[limit]; _elements=new Object[limit];
_capacity=_elements.length;
_growCapacity=-1; _growCapacity=-1;
_limit=limit; _limit=limit;
} }
@ -84,6 +89,7 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
public BlockingArrayQueue(int capacity,int growBy) public BlockingArrayQueue(int capacity,int growBy)
{ {
_elements=new Object[capacity]; _elements=new Object[capacity];
_capacity=_elements.length;
_growCapacity=growBy; _growCapacity=growBy;
_limit=Integer.MAX_VALUE; _limit=Integer.MAX_VALUE;
} }
@ -100,6 +106,7 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
throw new IllegalArgumentException(); throw new IllegalArgumentException();
_elements=new Object[capacity]; _elements=new Object[capacity];
_capacity=_elements.length;
_growCapacity=growBy; _growCapacity=growBy;
_limit=limit; _limit=limit;
} }
@ -107,7 +114,7 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public int getCapacity() public int getCapacity()
{ {
return _elements.length; return _capacity;
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -141,7 +148,6 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
_headLock.lock(); // Size cannot shrink _headLock.lock(); // Size cannot shrink
try try
{ {
if (_size.get() > 0) if (_size.get() > 0)
e = (E)_elements[_head]; e = (E)_elements[_head];
} }
@ -165,10 +171,9 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
{ {
if (_size.get() >= _limit) if (_size.get() >= _limit)
return false; return false;
else
{
// should we expand array? // should we expand array?
if (_size.get()==_elements.length) if (_size.get()==_capacity)
{ {
_headLock.lock(); // Need to grow array _headLock.lock(); // Need to grow array
try try
@ -184,10 +189,10 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
// add the element // add the element
_elements[_tail]=e; _elements[_tail]=e;
_tail=(_tail+1)%_elements.length; _tail=(_tail+1)%_capacity;
not_empty=0==_size.getAndIncrement(); not_empty=0==_size.getAndIncrement();
}
} }
finally finally
{ {
@ -225,7 +230,8 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
{ {
final int head=_head; final int head=_head;
e = (E)_elements[head]; e = (E)_elements[head];
_head=(head+1)%_elements.length; _elements[head]=null;
_head=(head+1)%_capacity;
if (_size.decrementAndGet()>0) if (_size.decrementAndGet()>0)
_notEmpty.signal(); _notEmpty.signal();
@ -267,7 +273,8 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
final int head=_head; final int head=_head;
e = (E)_elements[head]; e = (E)_elements[head];
_head=(head+1)%_elements.length; _elements[head]=null;
_head=(head+1)%_capacity;
if (_size.decrementAndGet()>0) if (_size.decrementAndGet()>0)
_notEmpty.signal(); _notEmpty.signal();
@ -319,7 +326,8 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
} }
e = (E)_elements[_head]; e = (E)_elements[_head];
_head=(_head+1)%_elements.length; _elements[_head]=null;
_head=(_head+1)%_capacity;
if (_size.decrementAndGet()>0) if (_size.decrementAndGet()>0)
_notEmpty.signal(); _notEmpty.signal();
@ -389,8 +397,8 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
if (index<0 || index>=_size.get()) if (index<0 || index>=_size.get())
throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
int i = _head+index; int i = _head+index;
if (i>=_elements.length) if (i>=_capacity)
i-=_elements.length; i-=_capacity;
return (E)_elements[i]; return (E)_elements[i];
} }
finally finally
@ -418,8 +426,8 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
int i = _head+index; int i = _head+index;
if (i>=_elements.length) if (i>=_capacity)
i-=_elements.length; i-=_capacity;
E old=(E)_elements[i]; E old=(E)_elements[i];
if (i<_tail) if (i<_tail)
@ -430,15 +438,15 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
} }
else else
{ {
System.arraycopy(_elements,i+1,_elements,i,_elements.length-i-1); System.arraycopy(_elements,i+1,_elements,i,_capacity-i-1);
if (_tail>0) if (_tail>0)
{ {
_elements[_elements.length]=_elements[0]; _elements[_capacity]=_elements[0];
System.arraycopy(_elements,1,_elements,0,_tail-1); System.arraycopy(_elements,1,_elements,0,_tail-1);
_tail--; _tail--;
} }
else else
_tail=_elements.length-1; _tail=_capacity-1;
_size.decrementAndGet(); _size.decrementAndGet();
} }
@ -473,8 +481,8 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")"); throw new IndexOutOfBoundsException("!("+0+"<"+index+"<="+_size+")");
int i = _head+index; int i = _head+index;
if (i>=_elements.length) if (i>=_capacity)
i-=_elements.length; i-=_capacity;
E old=(E)_elements[i]; E old=(E)_elements[i];
_elements[i]=e; _elements[i]=e;
return old; return old;
@ -517,11 +525,11 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
throw new IllegalStateException("full"); throw new IllegalStateException("full");
int i = _head+index; int i = _head+index;
if (i>=_elements.length) if (i>=_capacity)
i-=_elements.length; i-=_capacity;
_size.incrementAndGet(); _size.incrementAndGet();
_tail=(_tail+1)%_elements.length; _tail=(_tail+1)%_capacity;
if (i<_tail) if (i<_tail)
@ -534,10 +542,10 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
if (_tail>0) if (_tail>0)
{ {
System.arraycopy(_elements,0,_elements,1,_tail); System.arraycopy(_elements,0,_elements,1,_tail);
_elements[0]=_elements[_elements.length-1]; _elements[0]=_elements[_capacity-1];
} }
System.arraycopy(_elements,i,_elements,i+1,_elements.length-i-1); System.arraycopy(_elements,i,_elements,i+1,_capacity-i-1);
_elements[i]=e; _elements[i]=e;
} }
} }
@ -559,11 +567,17 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
if (_growCapacity<=0) if (_growCapacity<=0)
return false; return false;
_tailLock.lock();
try
{
_headLock.lock();
try
{
final int head=_head; final int head=_head;
final int tail=_tail; final int tail=_tail;
final int new_tail; final int new_tail;
Object[] elements=new Object[_elements.length+_growCapacity]; Object[] elements=new Object[_capacity+_growCapacity];
if (head<tail) if (head<tail)
{ {
@ -572,8 +586,8 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
} }
else if (head>tail || _size.get()>0) else if (head>tail || _size.get()>0)
{ {
new_tail=_elements.length+tail-head; new_tail=_capacity+tail-head;
int cut=_elements.length-head; int cut=_capacity-head;
System.arraycopy(_elements,head,elements,0,cut); System.arraycopy(_elements,head,elements,0,cut);
System.arraycopy(_elements,0,elements,cut,tail); System.arraycopy(_elements,0,elements,cut,tail);
} }
@ -583,9 +597,67 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements Queue<E>
} }
_elements=elements; _elements=elements;
_capacity=_elements.length;
_head=0; _head=0;
_tail=new_tail; _tail=new_tail;
return true; return true;
} }
finally
{
_headLock.unlock();
}
}
finally
{
_tailLock.unlock();
}
}
/* ------------------------------------------------------------ */
public int drainTo(Collection<? super E> c)
{
throw new UnsupportedOperationException();
}
/* ------------------------------------------------------------ */
public int drainTo(Collection<? super E> c, int maxElements)
{
throw new UnsupportedOperationException();
}
/* ------------------------------------------------------------ */
public boolean offer(E o, long timeout, TimeUnit unit) throws InterruptedException
{
throw new UnsupportedOperationException();
}
/* ------------------------------------------------------------ */
public void put(E o) throws InterruptedException
{
if (!add(o))
throw new IllegalStateException("full");
}
/* ------------------------------------------------------------ */
public int remainingCapacity()
{
_tailLock.lock();
try
{
_headLock.lock();
try
{
return getCapacity()-size();
}
finally
{
_headLock.unlock();
}
}
finally
{
_tailLock.unlock();
}
}
} }