From f2d15f12d5ebcbd6a2e1e8b34ed74dba679ed178 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Mon, 25 Feb 2019 16:14:41 +0100 Subject: [PATCH] Issue #1861 - Limit total bytes pooled by ByteBufferPools. Code cleanups + javadocs. Signed-off-by: Simone Bordet --- .../eclipse/jetty/io/ArrayByteBufferPool.java | 112 +++++++++------ .../org/eclipse/jetty/io/ByteBufferPool.java | 13 +- .../jetty/io/MappedByteBufferPool.java | 56 +++++--- .../jetty/io/ArrayByteBufferPoolTest.java | 128 +++++++++--------- .../jetty/io/MappedByteBufferPoolTest.java | 44 +++--- 5 files changed, 200 insertions(+), 153 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java index f2fc9cf6b7e..5c742839abd 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java @@ -20,93 +20,115 @@ package org.eclipse.jetty.io; import java.nio.ByteBuffer; +import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.annotation.ManagedOperation; + +/** + *

A ByteBuffer pool where ByteBuffers are held in queues that are held in array elements.

+ *

Given a capacity {@code factor} of 1024, the first array element holds a queue of ByteBuffers + * each of capacity 1024, the second array element holds a queue of ByteBuffers each of capacity + * 2048, and so on.

+ */ +@ManagedObject public class ArrayByteBufferPool implements ByteBufferPool { - private final int _min; - private final int _maxQueue; + private final int _minCapacity; private final ByteBufferPool.Bucket[] _direct; private final ByteBufferPool.Bucket[] _indirect; - private final int _inc; + private final int _factor; + /** + * Creates a new ArrayByteBufferPool with a default configuration. + */ public ArrayByteBufferPool() { - this(-1,-1,-1,-1); + this(-1, -1, -1, -1); } - public ArrayByteBufferPool(int minSize, int increment, int maxSize) + /** + * Creates a new ArrayByteBufferPool with the given configuration. + * + * @param minCapacity the minimum ByteBuffer capacity + * @param factor the capacity factor + * @param maxCapacity the maximum ByteBuffer capacity + */ + public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity) { - this(minSize,increment,maxSize,-1); + this(minCapacity, factor, maxCapacity, -1); } - - public ArrayByteBufferPool(int minSize, int increment, int maxSize, int maxQueue) + + /** + * Creates a new ArrayByteBufferPool with the given configuration. + * + * @param minCapacity the minimum ByteBuffer capacity + * @param factor the capacity factor + * @param maxCapacity the maximum ByteBuffer capacity + * @param maxQueueLength the maximum ByteBuffer queue length + */ + public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength) { - if (minSize<=0) - minSize=0; - if (increment<=0) - increment=1024; - if (maxSize<=0) - maxSize=64*1024; - if (minSize>=increment) - throw new IllegalArgumentException("minSize >= increment"); - if ((maxSize%increment)!=0 || increment>=maxSize) - throw new IllegalArgumentException("increment must be a divisor of maxSize"); - _min=minSize; - _inc=increment; + if (minCapacity <= 0) + minCapacity = 0; + if (factor <= 0) + factor = 1024; + if (maxCapacity <= 0) + maxCapacity = 64 * 1024; + if ((maxCapacity % factor) != 0 || factor >= maxCapacity) + throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity"); + _minCapacity = minCapacity; + _factor = factor; - _direct=new ByteBufferPool.Bucket[maxSize/increment]; - _indirect=new ByteBufferPool.Bucket[maxSize/increment]; - _maxQueue=maxQueue; + int length = maxCapacity / factor; + _direct = new ByteBufferPool.Bucket[length]; + _indirect = new ByteBufferPool.Bucket[length]; - int size=0; - for (int i=0;i<_direct.length;i++) + int capacity = 0; + for (int i = 0; i < _direct.length; ++i) { - size+=_inc; - _direct[i]=new ByteBufferPool.Bucket(this,size,_maxQueue); - _indirect[i]=new ByteBufferPool.Bucket(this,size,_maxQueue); + capacity += _factor; + _direct[i] = new ByteBufferPool.Bucket(this, capacity, maxQueueLength); + _indirect[i] = new ByteBufferPool.Bucket(this, capacity, maxQueueLength); } } @Override public ByteBuffer acquire(int size, boolean direct) { - ByteBufferPool.Bucket bucket = bucketFor(size,direct); - if (bucket==null) - return newByteBuffer(size,direct); - + ByteBufferPool.Bucket bucket = bucketFor(size, direct); + if (bucket == null) + return newByteBuffer(size, direct); return bucket.acquire(direct); - } @Override public void release(ByteBuffer buffer) { - if (buffer!=null) - { - ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(),buffer.isDirect()); - if (bucket!=null) + if (buffer != null) + { + ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(), buffer.isDirect()); + if (bucket != null) bucket.release(buffer); } } + @ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION") public void clear() { - for (int i=0;i<_direct.length;i++) + for (int i = 0; i < _direct.length; ++i) { _direct[i].clear(); _indirect[i].clear(); } } - private ByteBufferPool.Bucket bucketFor(int size,boolean direct) + private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct) { - if (size<=_min) + if (capacity <= _minCapacity) return null; - int b=(size-1)/_inc; - if (b>=_direct.length) + int b = (capacity - 1) / _factor; + if (b >= _direct.length) return null; - ByteBufferPool.Bucket bucket = direct?_direct[b]:_indirect[b]; - - return bucket; + return bucketsFor(direct)[b]; } // Package local for testing diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java index 541953edb31..138affb9327 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java @@ -56,6 +56,13 @@ public interface ByteBufferPool */ public void release(ByteBuffer buffer); + /** + *

Creates a new ByteBuffer of the given capacity and the given directness.

+ * + * @param capacity the ByteBuffer capacity + * @param direct the ByteBuffer directness + * @return a newly allocated ByteBuffer + */ default ByteBuffer newByteBuffer(int capacity, boolean direct) { return direct ? BufferUtil.allocateDirect(capacity) : BufferUtil.allocate(capacity); @@ -131,10 +138,10 @@ public interface ByteBufferPool private final int _capacity; private final AtomicInteger _space; - public Bucket(ByteBufferPool pool, int bufferSize, int maxSize) + public Bucket(ByteBufferPool pool, int capacity, int maxSize) { _pool = pool; - _capacity = bufferSize; + _capacity = capacity; _space = maxSize > 0 ? new AtomicInteger(maxSize) : null; } @@ -204,7 +211,7 @@ public interface ByteBufferPool @Override public String toString() { - return String.format("Bucket@%x{%d/%d}", hashCode(), size(), _capacity); + return String.format("%s@%x{%d/%d}", getClass().getSimpleName(), hashCode(), size(), _capacity); } } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java index a4b40322715..07f1d57d7cb 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java @@ -22,36 +22,58 @@ import java.nio.ByteBuffer; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Function; import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.annotation.ManagedOperation; +/** + *

A ByteBuffer pool where ByteBuffers are held in queues that are held in a Map.

+ *

Given a capacity {@code factor} of 1024, the Map entry with key {@code 1} holds a + * queue of ByteBuffers each of capacity 1024, the Map entry with key {@code 2} holds a + * queue of ByteBuffers each of capacity 2048, and so on.

+ */ +@ManagedObject public class MappedByteBufferPool implements ByteBufferPool { private final ConcurrentMap directBuffers = new ConcurrentHashMap<>(); private final ConcurrentMap heapBuffers = new ConcurrentHashMap<>(); private final int _factor; - private final Function _newBucket; + private final int _maxQueueLength; + /** + * Creates a new MappedByteBufferPool with a default configuration. + */ public MappedByteBufferPool() { this(-1); } + /** + * Creates a new MappedByteBufferPool with the given capacity factor. + * + * @param factor the capacity factor + */ public MappedByteBufferPool(int factor) { - this(factor,-1,null); + this(factor, -1); } - - public MappedByteBufferPool(int factor,int maxQueue) + + /** + * Creates a new MappedByteBufferPool with the given capacity factor and max queue length. + * + * @param factor the capacity factor + * @param maxQueueLength the maximum ByteBuffer queue length + */ + public MappedByteBufferPool(int factor, int maxQueueLength) { - this(factor,maxQueue,null); + _factor = factor <= 0 ? 1024 : factor; + _maxQueueLength = maxQueueLength; } - - public MappedByteBufferPool(int factor,int maxQueue,Function newBucket) + + private Bucket newBucket(int key) { - _factor = factor<=0?1024:factor; - _newBucket = newBucket!=null?newBucket:i->new Bucket(this,i*_factor,maxQueue); + return new Bucket(this, key * _factor, _maxQueueLength); } @Override @@ -59,10 +81,9 @@ public class MappedByteBufferPool implements ByteBufferPool { int b = bucketFor(size); ConcurrentMap buffers = bucketsFor(direct); - Bucket bucket = buffers.get(b); - if (bucket==null) - return newByteBuffer(b*_factor, direct); + if (bucket == null) + return newByteBuffer(b * _factor, direct); return bucket.acquire(direct); } @@ -71,17 +92,18 @@ public class MappedByteBufferPool implements ByteBufferPool { if (buffer == null) return; // nothing to do - + // validate that this buffer is from this pool - assert((buffer.capacity() % _factor) == 0); - + assert ((buffer.capacity() % _factor) == 0); + int b = bucketFor(buffer.capacity()); ConcurrentMap buckets = bucketsFor(buffer.isDirect()); - Bucket bucket = buckets.computeIfAbsent(b,_newBucket); + Bucket bucket = buckets.computeIfAbsent(b, this::newBucket); bucket.release(buffer); } + @ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION") public void clear() { directBuffers.values().forEach(Bucket::clear); diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java index 97f988b1018..f6f912759db 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java @@ -18,32 +18,33 @@ package org.eclipse.jetty.io; -import static org.hamcrest.Matchers.greaterThanOrEqualTo; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.hamcrest.MatcherAssert.assertThat; -import static org.junit.jupiter.api.Assertions.assertTrue; - import java.nio.ByteBuffer; import java.util.Arrays; import org.eclipse.jetty.io.ByteBufferPool.Bucket; import org.junit.jupiter.api.Test; -@SuppressWarnings("ReferenceEquality") +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertSame; +import static org.junit.jupiter.api.Assertions.assertTrue; + public class ArrayByteBufferPoolTest { @Test - public void testMinimumRelease() throws Exception + public void testMinimumRelease() { - ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000); + ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10, 100, 1000); ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true); - for (int size=1;size<=9;size++) + for (int size = 1; size <= 9; size++) { ByteBuffer buffer = bufferPool.acquire(size, true); assertTrue(buffer.isDirect()); - assertEquals(size,buffer.capacity()); + assertEquals(size, buffer.capacity()); for (ByteBufferPool.Bucket bucket : buckets) assertTrue(bucket.isEmpty()); @@ -55,39 +56,12 @@ public class ArrayByteBufferPoolTest } @Test - public void testMaxRelease() throws Exception + public void testMaxRelease() { - ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000); + ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10, 100, 1000); ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true); - for (int size=999;size<=1001;size++) - { - bufferPool.clear(); - ByteBuffer buffer = bufferPool.acquire(size, true); - - assertTrue(buffer.isDirect()); - assertThat(buffer.capacity(),greaterThanOrEqualTo(size)); - for (ByteBufferPool.Bucket bucket : buckets) - assertTrue(bucket.isEmpty()); - - bufferPool.release(buffer); - - int pooled=0; - for (ByteBufferPool.Bucket bucket : buckets) - { - pooled+=bucket.size(); - } - assertEquals(size<=1000,1==pooled); - } - } - - @Test - public void testAcquireRelease() throws Exception - { - ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000); - ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true); - - for (int size=390;size<=510;size++) + for (int size = 999; size <= 1001; size++) { bufferPool.clear(); ByteBuffer buffer = bufferPool.acquire(size, true); @@ -99,28 +73,54 @@ public class ArrayByteBufferPoolTest bufferPool.release(buffer); - int pooled=0; + int pooled = 0; for (ByteBufferPool.Bucket bucket : buckets) { - if (!bucket.isEmpty()) - { - pooled+=bucket.size(); - // TODO assertThat(bucket._bufferSize,greaterThanOrEqualTo(size)); - // TODO assertThat(bucket._bufferSize,Matchers.lessThan(size+100)); - } + pooled += bucket.size(); } - assertEquals(1,pooled); + assertEquals(size <= 1000, 1 == pooled); } } @Test - @SuppressWarnings("ReferenceEquality") - public void testAcquireReleaseAcquire() throws Exception + public void testAcquireRelease() { - ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10,100,1000); + ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10, 100, 1000); ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true); - for (int size=390;size<=510;size++) + for (int size = 390; size <= 510; size++) + { + bufferPool.clear(); + ByteBuffer buffer = bufferPool.acquire(size, true); + + assertTrue(buffer.isDirect()); + assertThat(buffer.capacity(), greaterThanOrEqualTo(size)); + for (ByteBufferPool.Bucket bucket : buckets) + assertTrue(bucket.isEmpty()); + + bufferPool.release(buffer); + + int pooled = 0; + for (ByteBufferPool.Bucket bucket : buckets) + { + if (!bucket.isEmpty()) + { + pooled += bucket.size(); + // TODO assertThat(bucket._bufferSize,greaterThanOrEqualTo(size)); + // TODO assertThat(bucket._bufferSize,Matchers.lessThan(size+100)); + } + } + assertEquals(1, pooled); + } + } + + @Test + public void testAcquireReleaseAcquire() + { + ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10, 100, 1000); + ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true); + + for (int size = 390; size <= 510; size++) { bufferPool.clear(); ByteBuffer buffer1 = bufferPool.acquire(size, true); @@ -130,45 +130,43 @@ public class ArrayByteBufferPoolTest ByteBuffer buffer3 = bufferPool.acquire(size, false); bufferPool.release(buffer3); - int pooled=0; + int pooled = 0; for (ByteBufferPool.Bucket bucket : buckets) { if (!bucket.isEmpty()) { - pooled+=bucket.size(); + pooled += bucket.size(); // TODO assertThat(bucket._bufferSize,greaterThanOrEqualTo(size)); // TODO assertThat(bucket._bufferSize,Matchers.lessThan(size+100)); } } - assertEquals(1,pooled); + assertEquals(1, pooled); - assertTrue(buffer1==buffer2); - assertTrue(buffer1!=buffer3); + assertSame(buffer1, buffer2); + assertNotSame(buffer1, buffer3); } } - @Test - public void testMaxQueue() throws Exception + public void testMaxQueue() { - ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(-1,-1,-1,2); + ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(-1, -1, -1, 2); ByteBuffer buffer1 = bufferPool.acquire(512, false); ByteBuffer buffer2 = bufferPool.acquire(512, false); ByteBuffer buffer3 = bufferPool.acquire(512, false); Bucket[] buckets = bufferPool.bucketsFor(false); - Arrays.asList(buckets).forEach(b->assertEquals(0,b.size())); - + Arrays.asList(buckets).forEach(b -> assertEquals(0, b.size())); + bufferPool.release(buffer1); - Bucket bucket=Arrays.asList(buckets).stream().filter(b->b.size()>0).findFirst().get(); + Bucket bucket = Arrays.stream(buckets).filter(b -> b.size() > 0).findFirst().get(); assertEquals(1, bucket.size()); bufferPool.release(buffer2); assertEquals(2, bucket.size()); - + bufferPool.release(buffer3); assertEquals(2, bucket.size()); } - } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java index ee9589e0d96..c6281767690 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java @@ -26,21 +26,21 @@ import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.StringUtil; import org.junit.jupiter.api.Test; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertSame; -import static org.hamcrest.MatcherAssert.assertThat; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; public class MappedByteBufferPoolTest { @Test - public void testAcquireRelease() throws Exception + public void testAcquireRelease() { MappedByteBufferPool bufferPool = new MappedByteBufferPool(); - ConcurrentMap buckets = bufferPool.bucketsFor(true); + ConcurrentMap buckets = bufferPool.bucketsFor(true); int size = 512; ByteBuffer buffer = bufferPool.acquire(size, true); @@ -56,10 +56,10 @@ public class MappedByteBufferPoolTest } @Test - public void testAcquireReleaseAcquire() throws Exception + public void testAcquireReleaseAcquire() { MappedByteBufferPool bufferPool = new MappedByteBufferPool(); - ConcurrentMap buckets = bufferPool.bucketsFor(false); + ConcurrentMap buckets = bufferPool.bucketsFor(false); ByteBuffer buffer1 = bufferPool.acquire(512, false); bufferPool.release(buffer1); @@ -76,10 +76,10 @@ public class MappedByteBufferPoolTest } @Test - public void testAcquireReleaseClear() throws Exception + public void testAcquireReleaseClear() { MappedByteBufferPool bufferPool = new MappedByteBufferPool(); - ConcurrentMap buckets = bufferPool.bucketsFor(true); + ConcurrentMap buckets = bufferPool.bucketsFor(true); ByteBuffer buffer = bufferPool.acquire(512, true); bufferPool.release(buffer); @@ -91,16 +91,14 @@ public class MappedByteBufferPoolTest assertTrue(buckets.isEmpty()); } - + /** * In a scenario where MappedByteBufferPool is being used improperly, * such as releasing a buffer that wasn't created/acquired by the * MappedByteBufferPool, an assertion is tested for. - * - * @throws Exception test failure */ @Test - public void testReleaseAssertion() throws Exception + public void testReleaseAssertion() { int factor = 1024; MappedByteBufferPool bufferPool = new MappedByteBufferPool(factor); @@ -110,8 +108,8 @@ public class MappedByteBufferPoolTest // Release a few small non-pool buffers bufferPool.release(ByteBuffer.wrap(StringUtil.getUtf8Bytes("Hello"))); - /* NOTES: - * + /* NOTES: + * * 1) This test will pass on command line maven build, as its surefire setup uses "-ea" already. * 2) In Eclipse, goto the "Run Configuration" for this test case. * Select the "Arguments" tab, and make sure "-ea" is present in the text box titled "VM arguments" @@ -123,24 +121,24 @@ public class MappedByteBufferPoolTest // Expected path. } } - + @Test public void testTagged() { MappedByteBufferPool pool = new MappedByteBufferPool.Tagged(); - ByteBuffer buffer = pool.acquire(1024,false); + ByteBuffer buffer = pool.acquire(1024, false); - assertThat(BufferUtil.toDetailString(buffer),containsString("@T00000001")); - buffer = pool.acquire(1024,false); - assertThat(BufferUtil.toDetailString(buffer),containsString("@T00000002")); + assertThat(BufferUtil.toDetailString(buffer), containsString("@T00000001")); + buffer = pool.acquire(1024, false); + assertThat(BufferUtil.toDetailString(buffer), containsString("@T00000002")); } @Test - public void testMaxQueue() throws Exception + public void testMaxQueue() { - MappedByteBufferPool bufferPool = new MappedByteBufferPool(-1,2); - ConcurrentMap buckets = bufferPool.bucketsFor(false); + MappedByteBufferPool bufferPool = new MappedByteBufferPool(-1, 2); + ConcurrentMap buckets = bufferPool.bucketsFor(false); ByteBuffer buffer1 = bufferPool.acquire(512, false); ByteBuffer buffer2 = bufferPool.acquire(512, false); @@ -149,12 +147,12 @@ public class MappedByteBufferPoolTest bufferPool.release(buffer1); assertEquals(1, buckets.size()); - Bucket bucket=buckets.values().iterator().next(); + Bucket bucket = buckets.values().iterator().next(); assertEquals(1, bucket.size()); bufferPool.release(buffer2); assertEquals(2, bucket.size()); - + bufferPool.release(buffer3); assertEquals(2, bucket.size()); }