Issue #432 Limit queue size in ByteBufferPools

Created a shared Bucket instance that can count down available space.
This commit is contained in:
Greg Wilkins 2016-04-24 09:58:38 +10:00
parent 367a807592
commit fbdd5e1da6
5 changed files with 221 additions and 120 deletions

View File

@ -19,25 +19,35 @@
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.eclipse.jetty.util.BufferUtil;
public class ArrayByteBufferPool implements ByteBufferPool
{
private final int _min;
private final Bucket[] _direct;
private final Bucket[] _indirect;
private final int _maxQueue;
private final ByteBufferPool.Bucket[] _direct;
private final ByteBufferPool.Bucket[] _indirect;
private final int _inc;
public ArrayByteBufferPool()
{
this(0,1024,64*1024);
this(-1,-1,-1,-1);
}
public ArrayByteBufferPool(int minSize, int increment, int maxSize)
{
this(minSize,increment,maxSize,-1);
}
public ArrayByteBufferPool(int minSize, int increment, int maxSize, int maxQueue)
{
if (minSize<=0)
minSize=0;
if (increment<=0)
increment=1024;
if (maxSize<=0)
maxSize=64*1024;
if (minSize>=increment)
throw new IllegalArgumentException("minSize >= increment");
if ((maxSize%increment)!=0 || increment>=maxSize)
@ -45,31 +55,28 @@ public class ArrayByteBufferPool implements ByteBufferPool
_min=minSize;
_inc=increment;
_direct=new Bucket[maxSize/increment];
_indirect=new Bucket[maxSize/increment];
_direct=new ByteBufferPool.Bucket[maxSize/increment];
_indirect=new ByteBufferPool.Bucket[maxSize/increment];
_maxQueue=maxQueue;
int size=0;
for (int i=0;i<_direct.length;i++)
{
size+=_inc;
_direct[i]=new Bucket(size);
_indirect[i]=new Bucket(size);
_direct[i]=new ByteBufferPool.Bucket(size,_maxQueue);
_indirect[i]=new ByteBufferPool.Bucket(size,_maxQueue);
}
}
@Override
public ByteBuffer acquire(int size, boolean direct)
{
Bucket bucket = bucketFor(size,direct);
ByteBuffer buffer = bucket==null?null:bucket._queue.poll();
ByteBufferPool.Bucket bucket = bucketFor(size,direct);
if (bucket==null)
return direct ? BufferUtil.allocateDirect(size) : BufferUtil.allocate(size);
if (buffer == null)
{
int capacity = bucket==null?size:bucket._size;
buffer = direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity);
}
return bucket.acquire(direct);
return buffer;
}
@Override
@ -77,12 +84,9 @@ public class ArrayByteBufferPool implements ByteBufferPool
{
if (buffer!=null)
{
Bucket bucket = bucketFor(buffer.capacity(),buffer.isDirect());
ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(),buffer.isDirect());
if (bucket!=null)
{
BufferUtil.clear(buffer);
bucket._queue.offer(buffer);
}
bucket.release(buffer);
}
}
@ -90,43 +94,25 @@ public class ArrayByteBufferPool implements ByteBufferPool
{
for (int i=0;i<_direct.length;i++)
{
_direct[i]._queue.clear();
_indirect[i]._queue.clear();
_direct[i].clear();
_indirect[i].clear();
}
}
private Bucket bucketFor(int size,boolean direct)
private ByteBufferPool.Bucket bucketFor(int size,boolean direct)
{
if (size<=_min)
return null;
int b=(size-1)/_inc;
if (b>=_direct.length)
return null;
Bucket bucket = direct?_direct[b]:_indirect[b];
ByteBufferPool.Bucket bucket = direct?_direct[b]:_indirect[b];
return bucket;
}
public static class Bucket
{
public final int _size;
public final Queue<ByteBuffer> _queue= new ConcurrentLinkedQueue<>();
Bucket(int size)
{
_size=size;
}
@Override
public String toString()
{
return String.format("Bucket@%x{%d,%d}",hashCode(),_size,_queue.size());
}
}
// Package local for testing
Bucket[] bucketsFor(boolean direct)
ByteBufferPool.Bucket[] bucketsFor(boolean direct)
{
return direct ? _direct : _indirect;
}

View File

@ -21,8 +21,11 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
/**
* <p>A {@link ByteBuffer} pool.</p>
@ -115,4 +118,71 @@ public interface ByteBufferPool
recycles.clear();
}
}
class Bucket
{
private final int _capacity;
private final AtomicInteger _space;
private final Queue<ByteBuffer> _queue= new ConcurrentArrayQueue<>();
public Bucket(int bufferSize,int maxSize)
{
_capacity=bufferSize;
_space=maxSize>0?new AtomicInteger(maxSize):null;
}
public void release(ByteBuffer buffer)
{
BufferUtil.clear(buffer);
if (_space==null)
_queue.offer(buffer);
else if (_space.decrementAndGet()>=0)
_queue.offer(buffer);
else
_space.incrementAndGet();
}
public ByteBuffer acquire(boolean direct)
{
ByteBuffer buffer = _queue.poll();
if (buffer == null)
return direct ? BufferUtil.allocateDirect(_capacity) : BufferUtil.allocate(_capacity);
if (_space!=null)
_space.incrementAndGet();
return buffer;
}
public void clear()
{
if (_space==null)
_queue.clear();
else
{
int s=_space.getAndSet(0);
while(s-->0)
{
if (_queue.poll()==null)
_space.incrementAndGet();
}
}
}
boolean isEmpty()
{
return _queue.isEmpty();
}
int size()
{
return _queue.size();
}
@Override
public String toString()
{
return String.format("Bucket@%x{%d,%d}",hashCode(),_capacity,_queue.size());
}
}
}

View File

@ -19,9 +19,7 @@
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
@ -29,39 +27,37 @@ import org.eclipse.jetty.util.BufferUtil;
public class MappedByteBufferPool implements ByteBufferPool
{
private final ConcurrentMap<Integer, Queue<ByteBuffer>> directBuffers = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Queue<ByteBuffer>> heapBuffers = new ConcurrentHashMap<>();
private final int factor;
private final ConcurrentMap<Integer, Bucket> directBuffers = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Bucket> heapBuffers = new ConcurrentHashMap<>();
private final int _factor;
private final int _maxQueue;
public MappedByteBufferPool()
{
this(1024);
this(-1);
}
public MappedByteBufferPool(int factor)
{
this.factor = factor;
this(factor,-1);
}
public MappedByteBufferPool(int factor,int maxQueue)
{
_factor = factor<=0?1024:factor;
_maxQueue=maxQueue;
}
@Override
public ByteBuffer acquire(int size, boolean direct)
{
int bucket = bucketFor(size);
ConcurrentMap<Integer, Queue<ByteBuffer>> buffers = buffersFor(direct);
int b = bucketFor(size);
ConcurrentMap<Integer, Bucket> buffers = bucketsFor(direct);
ByteBuffer result = null;
Queue<ByteBuffer> byteBuffers = buffers.get(bucket);
if (byteBuffers != null)
result = byteBuffers.poll();
if (result == null)
{
int capacity = bucket * factor;
result = newByteBuffer(capacity, direct);
}
BufferUtil.clear(result);
return result;
Bucket bucket = buffers.get(b);
if (bucket==null)
return newByteBuffer(b*_factor, direct);
return bucket.acquire(direct);
}
protected ByteBuffer newByteBuffer(int capacity, boolean direct)
@ -77,23 +73,13 @@ public class MappedByteBufferPool implements ByteBufferPool
return; // nothing to do
// validate that this buffer is from this pool
assert((buffer.capacity() % factor) == 0);
assert((buffer.capacity() % _factor) == 0);
int bucket = bucketFor(buffer.capacity());
ConcurrentMap<Integer, Queue<ByteBuffer>> buffers = buffersFor(buffer.isDirect());
int b = bucketFor(buffer.capacity());
ConcurrentMap<Integer, Bucket> buckets = bucketsFor(buffer.isDirect());
// Avoid to create a new queue every time, just to be discarded immediately
Queue<ByteBuffer> byteBuffers = buffers.get(bucket);
if (byteBuffers == null)
{
byteBuffers = new ConcurrentLinkedQueue<>();
Queue<ByteBuffer> existing = buffers.putIfAbsent(bucket, byteBuffers);
if (existing != null)
byteBuffers = existing;
}
BufferUtil.clear(buffer);
byteBuffers.offer(buffer);
Bucket bucket = buckets.computeIfAbsent(b,bi->new Bucket(b*_factor,_maxQueue));
bucket.release(buffer);
}
public void clear()
@ -104,14 +90,14 @@ public class MappedByteBufferPool implements ByteBufferPool
private int bucketFor(int size)
{
int bucket = size / factor;
if (size % factor > 0)
int bucket = size / _factor;
if (size % _factor > 0)
++bucket;
return bucket;
}
// Package local for testing
ConcurrentMap<Integer, Queue<ByteBuffer>> buffersFor(boolean direct)
ConcurrentMap<Integer, Bucket> bucketsFor(boolean direct)
{
return direct ? directBuffers : heapBuffers;
}

View File

@ -24,8 +24,12 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.io.ByteBufferPool.Bucket;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;
public class ArrayByteBufferPoolTest
@ -34,7 +38,7 @@ public class ArrayByteBufferPoolTest
public void testMinimumRelease() throws Exception
{
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000);
ArrayByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size=1;size<=9;size++)
{
@ -42,13 +46,13 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect());
assertEquals(size,buffer.capacity());
for (ArrayByteBufferPool.Bucket bucket : buckets)
assertTrue(bucket._queue.isEmpty());
for (ByteBufferPool.Bucket bucket : buckets)
assertTrue(bucket.isEmpty());
bufferPool.release(buffer);
for (ArrayByteBufferPool.Bucket bucket : buckets)
assertTrue(bucket._queue.isEmpty());
for (ByteBufferPool.Bucket bucket : buckets)
assertTrue(bucket.isEmpty());
}
}
@ -56,7 +60,7 @@ public class ArrayByteBufferPoolTest
public void testMaxRelease() throws Exception
{
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000);
ArrayByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size=999;size<=1001;size++)
{
@ -65,15 +69,15 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect());
assertThat(buffer.capacity(),greaterThanOrEqualTo(size));
for (ArrayByteBufferPool.Bucket bucket : buckets)
assertTrue(bucket._queue.isEmpty());
for (ByteBufferPool.Bucket bucket : buckets)
assertTrue(bucket.isEmpty());
bufferPool.release(buffer);
int pooled=0;
for (ArrayByteBufferPool.Bucket bucket : buckets)
for (ByteBufferPool.Bucket bucket : buckets)
{
pooled+=bucket._queue.size();
pooled+=bucket.size();
}
assertEquals(size<=1000,1==pooled);
}
@ -83,7 +87,7 @@ public class ArrayByteBufferPoolTest
public void testAcquireRelease() throws Exception
{
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000);
ArrayByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size=390;size<=510;size++)
{
@ -92,19 +96,19 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect());
assertThat(buffer.capacity(), greaterThanOrEqualTo(size));
for (ArrayByteBufferPool.Bucket bucket : buckets)
assertTrue(bucket._queue.isEmpty());
for (ByteBufferPool.Bucket bucket : buckets)
assertTrue(bucket.isEmpty());
bufferPool.release(buffer);
int pooled=0;
for (ArrayByteBufferPool.Bucket bucket : buckets)
for (ByteBufferPool.Bucket bucket : buckets)
{
if (!bucket._queue.isEmpty())
if (!bucket.isEmpty())
{
pooled+=bucket._queue.size();
assertThat(bucket._size,greaterThanOrEqualTo(size));
assertThat(bucket._size,Matchers.lessThan(size+100));
pooled+=bucket.size();
// TODO assertThat(bucket._bufferSize,greaterThanOrEqualTo(size));
// TODO assertThat(bucket._bufferSize,Matchers.lessThan(size+100));
}
}
assertEquals(1,pooled);
@ -115,7 +119,7 @@ public class ArrayByteBufferPoolTest
public void testAcquireReleaseAcquire() throws Exception
{
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000);
ArrayByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size=390;size<=510;size++)
{
@ -128,13 +132,13 @@ public class ArrayByteBufferPoolTest
bufferPool.release(buffer3);
int pooled=0;
for (ArrayByteBufferPool.Bucket bucket : buckets)
for (ByteBufferPool.Bucket bucket : buckets)
{
if (!bucket._queue.isEmpty())
if (!bucket.isEmpty())
{
pooled+=bucket._queue.size();
assertThat(bucket._size,greaterThanOrEqualTo(size));
assertThat(bucket._size,Matchers.lessThan(size+100));
pooled+=bucket.size();
// TODO assertThat(bucket._bufferSize,greaterThanOrEqualTo(size));
// TODO assertThat(bucket._bufferSize,Matchers.lessThan(size+100));
}
}
assertEquals(1,pooled);
@ -144,4 +148,28 @@ public class ArrayByteBufferPoolTest
}
}
@Test
public void testMaxQueue() throws Exception
{
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(-1,-1,-1,2);
ByteBuffer buffer1 = bufferPool.acquire(512, false);
ByteBuffer buffer2 = bufferPool.acquire(512, false);
ByteBuffer buffer3 = bufferPool.acquire(512, false);
Bucket[] buckets = bufferPool.bucketsFor(false);
Arrays.asList(buckets).forEach(b->assertEquals(0,b.size()));
bufferPool.release(buffer1);
Bucket bucket=Arrays.asList(buckets).stream().filter(b->b.size()>0).findFirst().get();
assertEquals(1, bucket.size());
bufferPool.release(buffer2);
assertEquals(2, bucket.size());
bufferPool.release(buffer3);
assertEquals(2, bucket.size());
}
}

View File

@ -27,9 +27,9 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.io.ByteBufferPool.Bucket;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.junit.Test;
@ -40,51 +40,56 @@ public class MappedByteBufferPoolTest
public void testAcquireRelease() throws Exception
{
MappedByteBufferPool bufferPool = new MappedByteBufferPool();
ConcurrentMap<Integer,Queue<ByteBuffer>> buffers = bufferPool.buffersFor(true);
ConcurrentMap<Integer,Bucket> buckets = bufferPool.bucketsFor(true);
int size = 512;
ByteBuffer buffer = bufferPool.acquire(size, true);
assertTrue(buffer.isDirect());
assertThat(buffer.capacity(), greaterThanOrEqualTo(size));
assertTrue(buffers.isEmpty());
assertTrue(buckets.isEmpty());
bufferPool.release(buffer);
assertEquals(1, buffers.size());
assertEquals(1, buckets.size());
assertEquals(1, buckets.values().iterator().next().size());
}
@Test
public void testAcquireReleaseAcquire() throws Exception
{
MappedByteBufferPool bufferPool = new MappedByteBufferPool();
ConcurrentMap<Integer,Queue<ByteBuffer>> buffers = bufferPool.buffersFor(false);
ConcurrentMap<Integer,Bucket> buckets = bufferPool.bucketsFor(false);
ByteBuffer buffer1 = bufferPool.acquire(512, false);
bufferPool.release(buffer1);
ByteBuffer buffer2 = bufferPool.acquire(512, false);
assertSame(buffer1, buffer2);
assertEquals(1, buckets.size());
assertEquals(0, buckets.values().iterator().next().size());
bufferPool.release(buffer2);
assertEquals(1, buffers.size());
assertEquals(1, buckets.size());
assertEquals(1, buckets.values().iterator().next().size());
}
@Test
public void testAcquireReleaseClear() throws Exception
{
MappedByteBufferPool bufferPool = new MappedByteBufferPool();
ConcurrentMap<Integer,Queue<ByteBuffer>> buffers = bufferPool.buffersFor(true);
ConcurrentMap<Integer,Bucket> buckets = bufferPool.bucketsFor(true);
ByteBuffer buffer = bufferPool.acquire(512, true);
bufferPool.release(buffer);
assertEquals(1, buffers.size());
assertEquals(1, buckets.size());
assertEquals(1, buckets.values().iterator().next().size());
bufferPool.clear();
assertTrue(buffers.isEmpty());
assertTrue(buckets.isEmpty());
}
/**
@ -128,4 +133,30 @@ public class MappedByteBufferPoolTest
buffer = pool.acquire(1024,false);
assertThat(BufferUtil.toDetailString(buffer),containsString("@T00000002"));
}
@Test
public void testMaxQueue() throws Exception
{
MappedByteBufferPool bufferPool = new MappedByteBufferPool(-1,2);
ConcurrentMap<Integer,Bucket> buckets = bufferPool.bucketsFor(false);
ByteBuffer buffer1 = bufferPool.acquire(512, false);
ByteBuffer buffer2 = bufferPool.acquire(512, false);
ByteBuffer buffer3 = bufferPool.acquire(512, false);
assertEquals(0, buckets.size());
bufferPool.release(buffer1);
assertEquals(1, buckets.size());
Bucket bucket=buckets.values().iterator().next();
assertEquals(1, bucket.size());
bufferPool.release(buffer2);
assertEquals(2, bucket.size());
bufferPool.release(buffer3);
assertEquals(2, bucket.size());
}
}