Merge remote-tracking branch 'origin/jetty-9.3.x'

This commit is contained in:
Jan Bartel 2016-04-25 16:12:26 +10:00
commit 19c9697af3
6 changed files with 248 additions and 120 deletions

View File

@ -25,6 +25,33 @@
<onlyAnalyze>org.eclipse.jetty.alpn.*</onlyAnalyze> <onlyAnalyze>org.eclipse.jetty.alpn.*</onlyAnalyze>
</configuration> </configuration>
</plugin> </plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>1.7</version>
<executions>
<execution>
<id>parse-version</id>
<goals>
<goal>parse-version</goal>
</goals>
<configuration>
<propertyPrefix>alpn</propertyPrefix>
<versionString>${alpn.api.version}</versionString>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.felix</groupId>
<artifactId>maven-bundle-plugin</artifactId>
<configuration>
<instructions>
<Bundle-SymbolicName>${bundle-symbolic-name};singleton:=true</Bundle-SymbolicName>
<Import-Package>org.eclipse.jetty.alpn;version="${alpn.majorVersion}.${alpn.minorVersion}.${alpn.incrementalVersion}",*</Import-Package>
</instructions>
</configuration>
</plugin>
</plugins> </plugins>
</build> </build>
<dependencies> <dependencies>

View File

@ -19,25 +19,35 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
public class ArrayByteBufferPool implements ByteBufferPool public class ArrayByteBufferPool implements ByteBufferPool
{ {
private final int _min; private final int _min;
private final Bucket[] _direct; private final int _maxQueue;
private final Bucket[] _indirect; private final ByteBufferPool.Bucket[] _direct;
private final ByteBufferPool.Bucket[] _indirect;
private final int _inc; private final int _inc;
public ArrayByteBufferPool() public ArrayByteBufferPool()
{ {
this(0,1024,64*1024); this(-1,-1,-1,-1);
} }
public ArrayByteBufferPool(int minSize, int increment, int maxSize) public ArrayByteBufferPool(int minSize, int increment, int maxSize)
{ {
this(minSize,increment,maxSize,-1);
}
public ArrayByteBufferPool(int minSize, int increment, int maxSize, int maxQueue)
{
if (minSize<=0)
minSize=0;
if (increment<=0)
increment=1024;
if (maxSize<=0)
maxSize=64*1024;
if (minSize>=increment) if (minSize>=increment)
throw new IllegalArgumentException("minSize >= increment"); throw new IllegalArgumentException("minSize >= increment");
if ((maxSize%increment)!=0 || increment>=maxSize) if ((maxSize%increment)!=0 || increment>=maxSize)
@ -45,31 +55,28 @@ public class ArrayByteBufferPool implements ByteBufferPool
_min=minSize; _min=minSize;
_inc=increment; _inc=increment;
_direct=new Bucket[maxSize/increment]; _direct=new ByteBufferPool.Bucket[maxSize/increment];
_indirect=new Bucket[maxSize/increment]; _indirect=new ByteBufferPool.Bucket[maxSize/increment];
_maxQueue=maxQueue;
int size=0; int size=0;
for (int i=0;i<_direct.length;i++) for (int i=0;i<_direct.length;i++)
{ {
size+=_inc; size+=_inc;
_direct[i]=new Bucket(size); _direct[i]=new ByteBufferPool.Bucket(size,_maxQueue);
_indirect[i]=new Bucket(size); _indirect[i]=new ByteBufferPool.Bucket(size,_maxQueue);
} }
} }
@Override @Override
public ByteBuffer acquire(int size, boolean direct) public ByteBuffer acquire(int size, boolean direct)
{ {
Bucket bucket = bucketFor(size,direct); ByteBufferPool.Bucket bucket = bucketFor(size,direct);
ByteBuffer buffer = bucket==null?null:bucket._queue.poll(); if (bucket==null)
return direct ? BufferUtil.allocateDirect(size) : BufferUtil.allocate(size);
if (buffer == null)
{ return bucket.acquire(direct);
int capacity = bucket==null?size:bucket._size;
buffer = direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity);
}
return buffer;
} }
@Override @Override
@ -77,12 +84,9 @@ public class ArrayByteBufferPool implements ByteBufferPool
{ {
if (buffer!=null) if (buffer!=null)
{ {
Bucket bucket = bucketFor(buffer.capacity(),buffer.isDirect()); ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(),buffer.isDirect());
if (bucket!=null) if (bucket!=null)
{ bucket.release(buffer);
BufferUtil.clear(buffer);
bucket._queue.offer(buffer);
}
} }
} }
@ -90,43 +94,25 @@ public class ArrayByteBufferPool implements ByteBufferPool
{ {
for (int i=0;i<_direct.length;i++) for (int i=0;i<_direct.length;i++)
{ {
_direct[i]._queue.clear(); _direct[i].clear();
_indirect[i]._queue.clear(); _indirect[i].clear();
} }
} }
private Bucket bucketFor(int size,boolean direct) private ByteBufferPool.Bucket bucketFor(int size,boolean direct)
{ {
if (size<=_min) if (size<=_min)
return null; return null;
int b=(size-1)/_inc; int b=(size-1)/_inc;
if (b>=_direct.length) if (b>=_direct.length)
return null; return null;
Bucket bucket = direct?_direct[b]:_indirect[b]; ByteBufferPool.Bucket bucket = direct?_direct[b]:_indirect[b];
return bucket; return bucket;
} }
public static class Bucket
{
public final int _size;
public final Queue<ByteBuffer> _queue= new ConcurrentLinkedQueue<>();
Bucket(int size)
{
_size=size;
}
@Override
public String toString()
{
return String.format("Bucket@%x{%d,%d}",hashCode(),_size,_queue.size());
}
}
// Package local for testing // Package local for testing
Bucket[] bucketsFor(boolean direct) ByteBufferPool.Bucket[] bucketsFor(boolean direct)
{ {
return direct ? _direct : _indirect; return direct ? _direct : _indirect;
} }

View File

@ -21,8 +21,11 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.ConcurrentArrayQueue;
/** /**
* <p>A {@link ByteBuffer} pool.</p> * <p>A {@link ByteBuffer} pool.</p>
@ -115,4 +118,71 @@ public interface ByteBufferPool
recycles.clear(); recycles.clear();
} }
} }
class Bucket
{
private final int _capacity;
private final AtomicInteger _space;
private final Queue<ByteBuffer> _queue= new ConcurrentArrayQueue<>();
public Bucket(int bufferSize,int maxSize)
{
_capacity=bufferSize;
_space=maxSize>0?new AtomicInteger(maxSize):null;
}
public void release(ByteBuffer buffer)
{
BufferUtil.clear(buffer);
if (_space==null)
_queue.offer(buffer);
else if (_space.decrementAndGet()>=0)
_queue.offer(buffer);
else
_space.incrementAndGet();
}
public ByteBuffer acquire(boolean direct)
{
ByteBuffer buffer = _queue.poll();
if (buffer == null)
return direct ? BufferUtil.allocateDirect(_capacity) : BufferUtil.allocate(_capacity);
if (_space!=null)
_space.incrementAndGet();
return buffer;
}
public void clear()
{
if (_space==null)
_queue.clear();
else
{
int s=_space.getAndSet(0);
while(s-->0)
{
if (_queue.poll()==null)
_space.incrementAndGet();
}
}
}
boolean isEmpty()
{
return _queue.isEmpty();
}
int size()
{
return _queue.size();
}
@Override
public String toString()
{
return String.format("Bucket@%x{%d,%d}",hashCode(),_capacity,_queue.size());
}
}
} }

View File

@ -19,9 +19,7 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -29,39 +27,37 @@ import org.eclipse.jetty.util.BufferUtil;
public class MappedByteBufferPool implements ByteBufferPool public class MappedByteBufferPool implements ByteBufferPool
{ {
private final ConcurrentMap<Integer, Queue<ByteBuffer>> directBuffers = new ConcurrentHashMap<>(); private final ConcurrentMap<Integer, Bucket> directBuffers = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Queue<ByteBuffer>> heapBuffers = new ConcurrentHashMap<>(); private final ConcurrentMap<Integer, Bucket> heapBuffers = new ConcurrentHashMap<>();
private final int factor; private final int _factor;
private final int _maxQueue;
public MappedByteBufferPool() public MappedByteBufferPool()
{ {
this(1024); this(-1);
} }
public MappedByteBufferPool(int factor) public MappedByteBufferPool(int factor)
{ {
this.factor = factor; this(factor,-1);
}
public MappedByteBufferPool(int factor,int maxQueue)
{
_factor = factor<=0?1024:factor;
_maxQueue=maxQueue;
} }
@Override @Override
public ByteBuffer acquire(int size, boolean direct) public ByteBuffer acquire(int size, boolean direct)
{ {
int bucket = bucketFor(size); int b = bucketFor(size);
ConcurrentMap<Integer, Queue<ByteBuffer>> buffers = buffersFor(direct); ConcurrentMap<Integer, Bucket> buffers = bucketsFor(direct);
ByteBuffer result = null; Bucket bucket = buffers.get(b);
Queue<ByteBuffer> byteBuffers = buffers.get(bucket); if (bucket==null)
if (byteBuffers != null) return newByteBuffer(b*_factor, direct);
result = byteBuffers.poll(); return bucket.acquire(direct);
if (result == null)
{
int capacity = bucket * factor;
result = newByteBuffer(capacity, direct);
}
BufferUtil.clear(result);
return result;
} }
protected ByteBuffer newByteBuffer(int capacity, boolean direct) protected ByteBuffer newByteBuffer(int capacity, boolean direct)
@ -77,23 +73,13 @@ public class MappedByteBufferPool implements ByteBufferPool
return; // nothing to do return; // nothing to do
// validate that this buffer is from this pool // validate that this buffer is from this pool
assert((buffer.capacity() % factor) == 0); assert((buffer.capacity() % _factor) == 0);
int bucket = bucketFor(buffer.capacity()); int b = bucketFor(buffer.capacity());
ConcurrentMap<Integer, Queue<ByteBuffer>> buffers = buffersFor(buffer.isDirect()); ConcurrentMap<Integer, Bucket> buckets = bucketsFor(buffer.isDirect());
// Avoid to create a new queue every time, just to be discarded immediately Bucket bucket = buckets.computeIfAbsent(b,bi->new Bucket(b*_factor,_maxQueue));
Queue<ByteBuffer> byteBuffers = buffers.get(bucket); bucket.release(buffer);
if (byteBuffers == null)
{
byteBuffers = new ConcurrentLinkedQueue<>();
Queue<ByteBuffer> existing = buffers.putIfAbsent(bucket, byteBuffers);
if (existing != null)
byteBuffers = existing;
}
BufferUtil.clear(buffer);
byteBuffers.offer(buffer);
} }
public void clear() public void clear()
@ -104,14 +90,14 @@ public class MappedByteBufferPool implements ByteBufferPool
private int bucketFor(int size) private int bucketFor(int size)
{ {
int bucket = size / factor; int bucket = size / _factor;
if (size % factor > 0) if (size % _factor > 0)
++bucket; ++bucket;
return bucket; return bucket;
} }
// Package local for testing // Package local for testing
ConcurrentMap<Integer, Queue<ByteBuffer>> buffersFor(boolean direct) ConcurrentMap<Integer, Bucket> bucketsFor(boolean direct)
{ {
return direct ? directBuffers : heapBuffers; return direct ? directBuffers : heapBuffers;
} }

View File

@ -24,8 +24,12 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.io.ByteBufferPool.Bucket;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
public class ArrayByteBufferPoolTest public class ArrayByteBufferPoolTest
@ -34,7 +38,7 @@ public class ArrayByteBufferPoolTest
public void testMinimumRelease() throws Exception public void testMinimumRelease() throws Exception
{ {
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000); ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000);
ArrayByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true); ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size=1;size<=9;size++) for (int size=1;size<=9;size++)
{ {
@ -42,13 +46,13 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect()); assertTrue(buffer.isDirect());
assertEquals(size,buffer.capacity()); assertEquals(size,buffer.capacity());
for (ArrayByteBufferPool.Bucket bucket : buckets) for (ByteBufferPool.Bucket bucket : buckets)
assertTrue(bucket._queue.isEmpty()); assertTrue(bucket.isEmpty());
bufferPool.release(buffer); bufferPool.release(buffer);
for (ArrayByteBufferPool.Bucket bucket : buckets) for (ByteBufferPool.Bucket bucket : buckets)
assertTrue(bucket._queue.isEmpty()); assertTrue(bucket.isEmpty());
} }
} }
@ -56,7 +60,7 @@ public class ArrayByteBufferPoolTest
public void testMaxRelease() throws Exception public void testMaxRelease() throws Exception
{ {
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000); ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000);
ArrayByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true); ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size=999;size<=1001;size++) for (int size=999;size<=1001;size++)
{ {
@ -65,15 +69,15 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect()); assertTrue(buffer.isDirect());
assertThat(buffer.capacity(),greaterThanOrEqualTo(size)); assertThat(buffer.capacity(),greaterThanOrEqualTo(size));
for (ArrayByteBufferPool.Bucket bucket : buckets) for (ByteBufferPool.Bucket bucket : buckets)
assertTrue(bucket._queue.isEmpty()); assertTrue(bucket.isEmpty());
bufferPool.release(buffer); bufferPool.release(buffer);
int pooled=0; int pooled=0;
for (ArrayByteBufferPool.Bucket bucket : buckets) for (ByteBufferPool.Bucket bucket : buckets)
{ {
pooled+=bucket._queue.size(); pooled+=bucket.size();
} }
assertEquals(size<=1000,1==pooled); assertEquals(size<=1000,1==pooled);
} }
@ -83,7 +87,7 @@ public class ArrayByteBufferPoolTest
public void testAcquireRelease() throws Exception public void testAcquireRelease() throws Exception
{ {
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000); ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000);
ArrayByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true); ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size=390;size<=510;size++) for (int size=390;size<=510;size++)
{ {
@ -92,19 +96,19 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect()); assertTrue(buffer.isDirect());
assertThat(buffer.capacity(), greaterThanOrEqualTo(size)); assertThat(buffer.capacity(), greaterThanOrEqualTo(size));
for (ArrayByteBufferPool.Bucket bucket : buckets) for (ByteBufferPool.Bucket bucket : buckets)
assertTrue(bucket._queue.isEmpty()); assertTrue(bucket.isEmpty());
bufferPool.release(buffer); bufferPool.release(buffer);
int pooled=0; int pooled=0;
for (ArrayByteBufferPool.Bucket bucket : buckets) for (ByteBufferPool.Bucket bucket : buckets)
{ {
if (!bucket._queue.isEmpty()) if (!bucket.isEmpty())
{ {
pooled+=bucket._queue.size(); pooled+=bucket.size();
assertThat(bucket._size,greaterThanOrEqualTo(size)); // TODO assertThat(bucket._bufferSize,greaterThanOrEqualTo(size));
assertThat(bucket._size,Matchers.lessThan(size+100)); // TODO assertThat(bucket._bufferSize,Matchers.lessThan(size+100));
} }
} }
assertEquals(1,pooled); assertEquals(1,pooled);
@ -115,7 +119,7 @@ public class ArrayByteBufferPoolTest
public void testAcquireReleaseAcquire() throws Exception public void testAcquireReleaseAcquire() throws Exception
{ {
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000); ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000);
ArrayByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true); ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size=390;size<=510;size++) for (int size=390;size<=510;size++)
{ {
@ -128,13 +132,13 @@ public class ArrayByteBufferPoolTest
bufferPool.release(buffer3); bufferPool.release(buffer3);
int pooled=0; int pooled=0;
for (ArrayByteBufferPool.Bucket bucket : buckets) for (ByteBufferPool.Bucket bucket : buckets)
{ {
if (!bucket._queue.isEmpty()) if (!bucket.isEmpty())
{ {
pooled+=bucket._queue.size(); pooled+=bucket.size();
assertThat(bucket._size,greaterThanOrEqualTo(size)); // TODO assertThat(bucket._bufferSize,greaterThanOrEqualTo(size));
assertThat(bucket._size,Matchers.lessThan(size+100)); // TODO assertThat(bucket._bufferSize,Matchers.lessThan(size+100));
} }
} }
assertEquals(1,pooled); assertEquals(1,pooled);
@ -143,5 +147,29 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer1!=buffer3); assertTrue(buffer1!=buffer3);
} }
} }
@Test
public void testMaxQueue() throws Exception
{
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(-1,-1,-1,2);
ByteBuffer buffer1 = bufferPool.acquire(512, false);
ByteBuffer buffer2 = bufferPool.acquire(512, false);
ByteBuffer buffer3 = bufferPool.acquire(512, false);
Bucket[] buckets = bufferPool.bucketsFor(false);
Arrays.asList(buckets).forEach(b->assertEquals(0,b.size()));
bufferPool.release(buffer1);
Bucket bucket=Arrays.asList(buckets).stream().filter(b->b.size()>0).findFirst().get();
assertEquals(1, bucket.size());
bufferPool.release(buffer2);
assertEquals(2, bucket.size());
bufferPool.release(buffer3);
assertEquals(2, bucket.size());
}
} }

View File

@ -27,9 +27,9 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail; import static org.junit.Assert.fail;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import org.eclipse.jetty.io.ByteBufferPool.Bucket;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.junit.Test; import org.junit.Test;
@ -40,51 +40,56 @@ public class MappedByteBufferPoolTest
public void testAcquireRelease() throws Exception public void testAcquireRelease() throws Exception
{ {
MappedByteBufferPool bufferPool = new MappedByteBufferPool(); MappedByteBufferPool bufferPool = new MappedByteBufferPool();
ConcurrentMap<Integer,Queue<ByteBuffer>> buffers = bufferPool.buffersFor(true); ConcurrentMap<Integer,Bucket> buckets = bufferPool.bucketsFor(true);
int size = 512; int size = 512;
ByteBuffer buffer = bufferPool.acquire(size, true); ByteBuffer buffer = bufferPool.acquire(size, true);
assertTrue(buffer.isDirect()); assertTrue(buffer.isDirect());
assertThat(buffer.capacity(), greaterThanOrEqualTo(size)); assertThat(buffer.capacity(), greaterThanOrEqualTo(size));
assertTrue(buffers.isEmpty()); assertTrue(buckets.isEmpty());
bufferPool.release(buffer); bufferPool.release(buffer);
assertEquals(1, buffers.size()); assertEquals(1, buckets.size());
assertEquals(1, buckets.values().iterator().next().size());
} }
@Test @Test
public void testAcquireReleaseAcquire() throws Exception public void testAcquireReleaseAcquire() throws Exception
{ {
MappedByteBufferPool bufferPool = new MappedByteBufferPool(); MappedByteBufferPool bufferPool = new MappedByteBufferPool();
ConcurrentMap<Integer,Queue<ByteBuffer>> buffers = bufferPool.buffersFor(false); ConcurrentMap<Integer,Bucket> buckets = bufferPool.bucketsFor(false);
ByteBuffer buffer1 = bufferPool.acquire(512, false); ByteBuffer buffer1 = bufferPool.acquire(512, false);
bufferPool.release(buffer1); bufferPool.release(buffer1);
ByteBuffer buffer2 = bufferPool.acquire(512, false); ByteBuffer buffer2 = bufferPool.acquire(512, false);
assertSame(buffer1, buffer2); assertSame(buffer1, buffer2);
assertEquals(1, buckets.size());
assertEquals(0, buckets.values().iterator().next().size());
bufferPool.release(buffer2); bufferPool.release(buffer2);
assertEquals(1, buffers.size()); assertEquals(1, buckets.size());
assertEquals(1, buckets.values().iterator().next().size());
} }
@Test @Test
public void testAcquireReleaseClear() throws Exception public void testAcquireReleaseClear() throws Exception
{ {
MappedByteBufferPool bufferPool = new MappedByteBufferPool(); MappedByteBufferPool bufferPool = new MappedByteBufferPool();
ConcurrentMap<Integer,Queue<ByteBuffer>> buffers = bufferPool.buffersFor(true); ConcurrentMap<Integer,Bucket> buckets = bufferPool.bucketsFor(true);
ByteBuffer buffer = bufferPool.acquire(512, true); ByteBuffer buffer = bufferPool.acquire(512, true);
bufferPool.release(buffer); bufferPool.release(buffer);
assertEquals(1, buffers.size()); assertEquals(1, buckets.size());
assertEquals(1, buckets.values().iterator().next().size());
bufferPool.clear(); bufferPool.clear();
assertTrue(buffers.isEmpty()); assertTrue(buckets.isEmpty());
} }
/** /**
@ -128,4 +133,30 @@ public class MappedByteBufferPoolTest
buffer = pool.acquire(1024,false); buffer = pool.acquire(1024,false);
assertThat(BufferUtil.toDetailString(buffer),containsString("@T00000002")); assertThat(BufferUtil.toDetailString(buffer),containsString("@T00000002"));
} }
@Test
public void testMaxQueue() throws Exception
{
MappedByteBufferPool bufferPool = new MappedByteBufferPool(-1,2);
ConcurrentMap<Integer,Bucket> buckets = bufferPool.bucketsFor(false);
ByteBuffer buffer1 = bufferPool.acquire(512, false);
ByteBuffer buffer2 = bufferPool.acquire(512, false);
ByteBuffer buffer3 = bufferPool.acquire(512, false);
assertEquals(0, buckets.size());
bufferPool.release(buffer1);
assertEquals(1, buckets.size());
Bucket bucket=buckets.values().iterator().next();
assertEquals(1, bucket.size());
bufferPool.release(buffer2);
assertEquals(2, bucket.size());
bufferPool.release(buffer3);
assertEquals(2, bucket.size());
}
} }