Issue #5287 - rework CompressionPool to use the jetty-util pool
Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
parent
d78e1f8a30
commit
b239fa07c6
|
@ -18,19 +18,14 @@
|
|||
|
||||
package org.eclipse.jetty.util.compression;
|
||||
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.eclipse.jetty.util.Pool;
|
||||
import org.eclipse.jetty.util.component.AbstractLifeCycle;
|
||||
|
||||
public abstract class CompressionPool<T> extends AbstractLifeCycle
|
||||
{
|
||||
public static final int INFINITE_CAPACITY = -1;
|
||||
|
||||
private final Queue<T> _pool;
|
||||
private final AtomicInteger _numObjects = new AtomicInteger(0);
|
||||
private int _capacity;
|
||||
private final Pool<T> _pool;
|
||||
|
||||
/**
|
||||
* Create a Pool of {@link T} instances.
|
||||
|
@ -43,8 +38,7 @@ public abstract class CompressionPool<T> extends AbstractLifeCycle
|
|||
*/
|
||||
public CompressionPool(int capacity)
|
||||
{
|
||||
_capacity = capacity;
|
||||
_pool = new ConcurrentLinkedQueue<>();
|
||||
_pool = new Pool<T>(capacity, 1);
|
||||
}
|
||||
|
||||
public int getCapacity()
|
||||
|
@ -66,73 +60,61 @@ public abstract class CompressionPool<T> extends AbstractLifeCycle
|
|||
/**
|
||||
* @return Object taken from the pool if it is not empty or a newly created Object
|
||||
*/
|
||||
public T acquire()
|
||||
public Entry acquire()
|
||||
{
|
||||
T object;
|
||||
|
||||
if (_capacity == 0)
|
||||
object = newObject();
|
||||
else
|
||||
{
|
||||
object = _pool.poll();
|
||||
if (object == null)
|
||||
object = newObject();
|
||||
else if (_capacity > 0)
|
||||
_numObjects.decrementAndGet();
|
||||
}
|
||||
|
||||
return object;
|
||||
return new Entry(_pool.acquire(e -> newObject()));
|
||||
}
|
||||
|
||||
/**
|
||||
* @param object returns this Object to the pool or calls {@link #end(Object)} if the pool is full.
|
||||
* @param entry returns this Object to the pool or calls {@link #end(Object)} if the pool is full.
|
||||
*/
|
||||
public void release(T object)
|
||||
public void release(Entry entry)
|
||||
{
|
||||
if (object == null)
|
||||
return;
|
||||
|
||||
if (_capacity == 0 || !isRunning())
|
||||
{
|
||||
end(object);
|
||||
}
|
||||
else if (_capacity < 0)
|
||||
{
|
||||
reset(object);
|
||||
_pool.add(object);
|
||||
}
|
||||
else
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
int d = _numObjects.get();
|
||||
|
||||
if (d >= _capacity)
|
||||
{
|
||||
end(object);
|
||||
break;
|
||||
}
|
||||
|
||||
if (_numObjects.compareAndSet(d, d + 1))
|
||||
{
|
||||
reset(object);
|
||||
_pool.add(object);
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
entry.release();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doStop()
|
||||
{
|
||||
T t = _pool.poll();
|
||||
while (t != null)
|
||||
// TODO: We can't use this because it will not end the entries after it removes them.
|
||||
// _pool.close();
|
||||
}
|
||||
|
||||
public class Entry
|
||||
{
|
||||
private T _value;
|
||||
private Pool<T>.Entry _entry;
|
||||
|
||||
Entry(Pool<T>.Entry entry)
|
||||
{
|
||||
end(t);
|
||||
t = _pool.poll();
|
||||
_entry = entry;
|
||||
_value = (_entry != null) ? _entry.getPooled() : newObject();
|
||||
}
|
||||
|
||||
public T get()
|
||||
{
|
||||
return _value;
|
||||
}
|
||||
|
||||
public void release()
|
||||
{
|
||||
// Reset the value for the next usage.
|
||||
reset(_value);
|
||||
|
||||
if (_entry != null)
|
||||
{
|
||||
// If release return false, the entry should be removed and the object should be disposed.
|
||||
if (!_pool.release(_entry))
|
||||
{
|
||||
// TODO: what does it mean if this returns false???
|
||||
if (_pool.remove(_entry))
|
||||
end(_value);
|
||||
}
|
||||
}
|
||||
|
||||
_value = null;
|
||||
_entry = null;
|
||||
}
|
||||
_numObjects.set(0);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue