_queue= new ConcurrentLinkedQueue<>();
-
- Bucket(int size)
- {
- _size=size;
- }
-
- @Override
- public String toString()
- {
- return String.format("Bucket@%x{%d,%d}",hashCode(),_size,_queue.size());
- }
- }
-
-
// Package local for testing
- Bucket[] bucketsFor(boolean direct)
+ ByteBufferPool.Bucket[] bucketsFor(boolean direct)
{
return direct ? _direct : _indirect;
}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java
index 6067d78fc36..4944dac91c5 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java
@@ -21,8 +21,11 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
+import java.util.Queue;
+import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.BufferUtil;
+import org.eclipse.jetty.util.ConcurrentArrayQueue;
/**
* A {@link ByteBuffer} pool.
@@ -115,4 +118,71 @@ public interface ByteBufferPool
recycles.clear();
}
}
+
+ class Bucket
+ {
+ private final int _capacity;
+ private final AtomicInteger _space;
+ private final Queue _queue= new ConcurrentArrayQueue<>();
+
+ public Bucket(int bufferSize,int maxSize)
+ {
+ _capacity=bufferSize;
+ _space=maxSize>0?new AtomicInteger(maxSize):null;
+ }
+
+ public void release(ByteBuffer buffer)
+ {
+ BufferUtil.clear(buffer);
+ if (_space==null)
+ _queue.offer(buffer);
+ else if (_space.decrementAndGet()>=0)
+ _queue.offer(buffer);
+ else
+ _space.incrementAndGet();
+ }
+
+ public ByteBuffer acquire(boolean direct)
+ {
+ ByteBuffer buffer = _queue.poll();
+ if (buffer == null)
+ return direct ? BufferUtil.allocateDirect(_capacity) : BufferUtil.allocate(_capacity);
+ if (_space!=null)
+ _space.incrementAndGet();
+ return buffer;
+ }
+
+ public void clear()
+ {
+ if (_space==null)
+ _queue.clear();
+ else
+ {
+ int s=_space.getAndSet(0);
+ while(s-->0)
+ {
+ if (_queue.poll()==null)
+ _space.incrementAndGet();
+ }
+ }
+ }
+
+ boolean isEmpty()
+ {
+ return _queue.isEmpty();
+ }
+
+ int size()
+ {
+ return _queue.size();
+ }
+
+ @Override
+ public String toString()
+ {
+ return String.format("Bucket@%x{%d,%d}",hashCode(),_capacity,_queue.size());
+ }
+ }
+
+
}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java
index bc9e8147360..335d6ccba7b 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java
@@ -19,9 +19,7 @@
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
-import java.util.Queue;
import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
@@ -29,39 +27,37 @@ import org.eclipse.jetty.util.BufferUtil;
public class MappedByteBufferPool implements ByteBufferPool
{
- private final ConcurrentMap> directBuffers = new ConcurrentHashMap<>();
- private final ConcurrentMap> heapBuffers = new ConcurrentHashMap<>();
- private final int factor;
+ private final ConcurrentMap directBuffers = new ConcurrentHashMap<>();
+ private final ConcurrentMap heapBuffers = new ConcurrentHashMap<>();
+ private final int _factor;
+ private final int _maxQueue;
public MappedByteBufferPool()
{
- this(1024);
+ this(-1);
}
public MappedByteBufferPool(int factor)
{
- this.factor = factor;
+ this(factor,-1);
+ }
+
+ public MappedByteBufferPool(int factor,int maxQueue)
+ {
+ _factor = factor<=0?1024:factor;
+ _maxQueue=maxQueue;
}
@Override
public ByteBuffer acquire(int size, boolean direct)
{
- int bucket = bucketFor(size);
- ConcurrentMap> buffers = buffersFor(direct);
+ int b = bucketFor(size);
+ ConcurrentMap buffers = bucketsFor(direct);
- ByteBuffer result = null;
- Queue byteBuffers = buffers.get(bucket);
- if (byteBuffers != null)
- result = byteBuffers.poll();
-
- if (result == null)
- {
- int capacity = bucket * factor;
- result = newByteBuffer(capacity, direct);
- }
-
- BufferUtil.clear(result);
- return result;
+ Bucket bucket = buffers.get(b);
+ if (bucket==null)
+ return newByteBuffer(b*_factor, direct);
+ return bucket.acquire(direct);
}
protected ByteBuffer newByteBuffer(int capacity, boolean direct)
@@ -77,23 +73,13 @@ public class MappedByteBufferPool implements ByteBufferPool
return; // nothing to do
// validate that this buffer is from this pool
- assert((buffer.capacity() % factor) == 0);
+ assert((buffer.capacity() % _factor) == 0);
- int bucket = bucketFor(buffer.capacity());
- ConcurrentMap> buffers = buffersFor(buffer.isDirect());
+ int b = bucketFor(buffer.capacity());
+ ConcurrentMap buckets = bucketsFor(buffer.isDirect());
- // Avoid to create a new queue every time, just to be discarded immediately
- Queue byteBuffers = buffers.get(bucket);
- if (byteBuffers == null)
- {
- byteBuffers = new ConcurrentLinkedQueue<>();
- Queue existing = buffers.putIfAbsent(bucket, byteBuffers);
- if (existing != null)
- byteBuffers = existing;
- }
-
- BufferUtil.clear(buffer);
- byteBuffers.offer(buffer);
+ Bucket bucket = buckets.computeIfAbsent(b,bi->new Bucket(b*_factor,_maxQueue));
+ bucket.release(buffer);
}
public void clear()
@@ -104,14 +90,14 @@ public class MappedByteBufferPool implements ByteBufferPool
private int bucketFor(int size)
{
- int bucket = size / factor;
- if (size % factor > 0)
+ int bucket = size / _factor;
+ if (size % _factor > 0)
++bucket;
return bucket;
}
// Package local for testing
- ConcurrentMap> buffersFor(boolean direct)
+ ConcurrentMap bucketsFor(boolean direct)
{
return direct ? directBuffers : heapBuffers;
}
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java
index b8ab2ec84d0..8dff64edc2f 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java
@@ -24,8 +24,12 @@ import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.concurrent.ConcurrentMap;
+import org.eclipse.jetty.io.ByteBufferPool.Bucket;
import org.hamcrest.Matchers;
+import org.junit.Assert;
import org.junit.Test;
public class ArrayByteBufferPoolTest
@@ -34,7 +38,7 @@ public class ArrayByteBufferPoolTest
public void testMinimumRelease() throws Exception
{
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000);
- ArrayByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
+ ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size=1;size<=9;size++)
{
@@ -42,13 +46,13 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect());
assertEquals(size,buffer.capacity());
- for (ArrayByteBufferPool.Bucket bucket : buckets)
- assertTrue(bucket._queue.isEmpty());
+ for (ByteBufferPool.Bucket bucket : buckets)
+ assertTrue(bucket.isEmpty());
bufferPool.release(buffer);
- for (ArrayByteBufferPool.Bucket bucket : buckets)
- assertTrue(bucket._queue.isEmpty());
+ for (ByteBufferPool.Bucket bucket : buckets)
+ assertTrue(bucket.isEmpty());
}
}
@@ -56,7 +60,7 @@ public class ArrayByteBufferPoolTest
public void testMaxRelease() throws Exception
{
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000);
- ArrayByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
+ ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size=999;size<=1001;size++)
{
@@ -65,15 +69,15 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect());
assertThat(buffer.capacity(),greaterThanOrEqualTo(size));
- for (ArrayByteBufferPool.Bucket bucket : buckets)
- assertTrue(bucket._queue.isEmpty());
+ for (ByteBufferPool.Bucket bucket : buckets)
+ assertTrue(bucket.isEmpty());
bufferPool.release(buffer);
int pooled=0;
- for (ArrayByteBufferPool.Bucket bucket : buckets)
+ for (ByteBufferPool.Bucket bucket : buckets)
{
- pooled+=bucket._queue.size();
+ pooled+=bucket.size();
}
assertEquals(size<=1000,1==pooled);
}
@@ -83,7 +87,7 @@ public class ArrayByteBufferPoolTest
public void testAcquireRelease() throws Exception
{
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000);
- ArrayByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
+ ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size=390;size<=510;size++)
{
@@ -92,19 +96,19 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect());
assertThat(buffer.capacity(), greaterThanOrEqualTo(size));
- for (ArrayByteBufferPool.Bucket bucket : buckets)
- assertTrue(bucket._queue.isEmpty());
+ for (ByteBufferPool.Bucket bucket : buckets)
+ assertTrue(bucket.isEmpty());
bufferPool.release(buffer);
int pooled=0;
- for (ArrayByteBufferPool.Bucket bucket : buckets)
+ for (ByteBufferPool.Bucket bucket : buckets)
{
- if (!bucket._queue.isEmpty())
+ if (!bucket.isEmpty())
{
- pooled+=bucket._queue.size();
- assertThat(bucket._size,greaterThanOrEqualTo(size));
- assertThat(bucket._size,Matchers.lessThan(size+100));
+ pooled+=bucket.size();
+ // TODO assertThat(bucket._bufferSize,greaterThanOrEqualTo(size));
+ // TODO assertThat(bucket._bufferSize,Matchers.lessThan(size+100));
}
}
assertEquals(1,pooled);
@@ -115,7 +119,7 @@ public class ArrayByteBufferPoolTest
public void testAcquireReleaseAcquire() throws Exception
{
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000);
- ArrayByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
+ ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size=390;size<=510;size++)
{
@@ -128,13 +132,13 @@ public class ArrayByteBufferPoolTest
bufferPool.release(buffer3);
int pooled=0;
- for (ArrayByteBufferPool.Bucket bucket : buckets)
+ for (ByteBufferPool.Bucket bucket : buckets)
{
- if (!bucket._queue.isEmpty())
+ if (!bucket.isEmpty())
{
- pooled+=bucket._queue.size();
- assertThat(bucket._size,greaterThanOrEqualTo(size));
- assertThat(bucket._size,Matchers.lessThan(size+100));
+ pooled+=bucket.size();
+ // TODO assertThat(bucket._bufferSize,greaterThanOrEqualTo(size));
+ // TODO assertThat(bucket._bufferSize,Matchers.lessThan(size+100));
}
}
assertEquals(1,pooled);
@@ -143,5 +147,29 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer1!=buffer3);
}
}
+
+
+ @Test
+ public void testMaxQueue() throws Exception
+ {
+ ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(-1,-1,-1,2);
+
+ ByteBuffer buffer1 = bufferPool.acquire(512, false);
+ ByteBuffer buffer2 = bufferPool.acquire(512, false);
+ ByteBuffer buffer3 = bufferPool.acquire(512, false);
+
+ Bucket[] buckets = bufferPool.bucketsFor(false);
+ Arrays.asList(buckets).forEach(b->assertEquals(0,b.size()));
+
+ bufferPool.release(buffer1);
+ Bucket bucket=Arrays.asList(buckets).stream().filter(b->b.size()>0).findFirst().get();
+ assertEquals(1, bucket.size());
+
+ bufferPool.release(buffer2);
+ assertEquals(2, bucket.size());
+
+ bufferPool.release(buffer3);
+ assertEquals(2, bucket.size());
+ }
}
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java
index a3815a832aa..e9b2b3942ce 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java
@@ -27,9 +27,9 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.ByteBuffer;
-import java.util.Queue;
import java.util.concurrent.ConcurrentMap;
+import org.eclipse.jetty.io.ByteBufferPool.Bucket;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.junit.Test;
@@ -40,51 +40,56 @@ public class MappedByteBufferPoolTest
public void testAcquireRelease() throws Exception
{
MappedByteBufferPool bufferPool = new MappedByteBufferPool();
- ConcurrentMap> buffers = bufferPool.buffersFor(true);
+ ConcurrentMap buckets = bufferPool.bucketsFor(true);
int size = 512;
ByteBuffer buffer = bufferPool.acquire(size, true);
assertTrue(buffer.isDirect());
assertThat(buffer.capacity(), greaterThanOrEqualTo(size));
- assertTrue(buffers.isEmpty());
+ assertTrue(buckets.isEmpty());
bufferPool.release(buffer);
- assertEquals(1, buffers.size());
+ assertEquals(1, buckets.size());
+ assertEquals(1, buckets.values().iterator().next().size());
}
@Test
public void testAcquireReleaseAcquire() throws Exception
{
MappedByteBufferPool bufferPool = new MappedByteBufferPool();
- ConcurrentMap> buffers = bufferPool.buffersFor(false);
+ ConcurrentMap buckets = bufferPool.bucketsFor(false);
ByteBuffer buffer1 = bufferPool.acquire(512, false);
bufferPool.release(buffer1);
ByteBuffer buffer2 = bufferPool.acquire(512, false);
assertSame(buffer1, buffer2);
+ assertEquals(1, buckets.size());
+ assertEquals(0, buckets.values().iterator().next().size());
bufferPool.release(buffer2);
- assertEquals(1, buffers.size());
+ assertEquals(1, buckets.size());
+ assertEquals(1, buckets.values().iterator().next().size());
}
@Test
public void testAcquireReleaseClear() throws Exception
{
MappedByteBufferPool bufferPool = new MappedByteBufferPool();
- ConcurrentMap> buffers = bufferPool.buffersFor(true);
+ ConcurrentMap buckets = bufferPool.bucketsFor(true);
ByteBuffer buffer = bufferPool.acquire(512, true);
bufferPool.release(buffer);
- assertEquals(1, buffers.size());
+ assertEquals(1, buckets.size());
+ assertEquals(1, buckets.values().iterator().next().size());
bufferPool.clear();
- assertTrue(buffers.isEmpty());
+ assertTrue(buckets.isEmpty());
}
/**
@@ -128,4 +133,30 @@ public class MappedByteBufferPoolTest
buffer = pool.acquire(1024,false);
assertThat(BufferUtil.toDetailString(buffer),containsString("@T00000002"));
}
+
+
+
+ @Test
+ public void testMaxQueue() throws Exception
+ {
+ MappedByteBufferPool bufferPool = new MappedByteBufferPool(-1,2);
+ ConcurrentMap buckets = bufferPool.bucketsFor(false);
+
+ ByteBuffer buffer1 = bufferPool.acquire(512, false);
+ ByteBuffer buffer2 = bufferPool.acquire(512, false);
+ ByteBuffer buffer3 = bufferPool.acquire(512, false);
+ assertEquals(0, buckets.size());
+
+ bufferPool.release(buffer1);
+ assertEquals(1, buckets.size());
+ Bucket bucket=buckets.values().iterator().next();
+ assertEquals(1, bucket.size());
+
+ bufferPool.release(buffer2);
+ assertEquals(2, bucket.size());
+
+ bufferPool.release(buffer3);
+ assertEquals(2, bucket.size());
+
+ }
}