From f86a719bce89844337e4f2bde68e8e147095ed80 Mon Sep 17 00:00:00 2001 From: Lachlan Date: Thu, 25 Nov 2021 10:09:10 +1100 Subject: [PATCH] Issue #6974 - improvements & fixes to ByteBufferPool implementations (#7017) - WebSocket should user server ByteBufferPool if possible - fix various bugs ByteBufferPool implementations - add heuristic for maxHeapMemory and maxDirectMemory - Add dump for ByteBufferPools - add LogArrayByteBufferPool that does exponential scaling of bucket size. - ByteBufferPools should default to use maxMemory heuristic - Add module jetty-bytebufferpool-logarithmic Signed-off-by: Lachlan Roberts Co-authored-by: Simone Bordet --- .../jetty/io/AbstractByteBufferPool.java | 34 ++- .../eclipse/jetty/io/ArrayByteBufferPool.java | 131 ++++++--- .../org/eclipse/jetty/io/ByteBufferPool.java | 70 +++-- .../io/LogarithmicArrayByteBufferPool.java | 112 ++++++++ .../jetty/io/MappedByteBufferPool.java | 96 +++++-- .../jetty/io/ArrayByteBufferPoolTest.java | 21 +- .../jetty/io/MappedByteBufferPoolTest.java | 7 +- .../etc/jetty-bytebufferpool-logarithmic.xml | 10 + .../main/config/etc/jetty-bytebufferpool.xml | 4 +- .../modules/bytebufferpool-logarithmic.mod | 30 +++ .../main/config/modules/bytebufferpool.mod | 11 +- .../tests/WebSocketBufferPoolTest.java | 248 ++++++++++++++++++ .../server/WebSocketServerFactory.java | 22 +- 13 files changed, 671 insertions(+), 125 deletions(-) create mode 100644 jetty-io/src/main/java/org/eclipse/jetty/io/LogarithmicArrayByteBufferPool.java create mode 100644 jetty-server/src/main/config/etc/jetty-bytebufferpool-logarithmic.xml create mode 100644 jetty-server/src/main/config/modules/bytebufferpool-logarithmic.mod create mode 100644 jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketBufferPoolTest.java diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java index c46d134dd8b..81b9a0bbe24 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.io; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Consumer; +import java.util.function.IntConsumer; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; @@ -32,16 +33,24 @@ abstract class AbstractByteBufferPool implements ByteBufferPool private final int _factor; private final int _maxQueueLength; private final long _maxHeapMemory; - private final AtomicLong _heapMemory = new AtomicLong(); private final long _maxDirectMemory; + private final AtomicLong _heapMemory = new AtomicLong(); private final AtomicLong _directMemory = new AtomicLong(); + /** + * Creates a new ByteBufferPool with the given configuration. + * + * @param factor the capacity factor + * @param maxQueueLength the maximum ByteBuffer queue length + * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic. + * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic. + */ protected AbstractByteBufferPool(int factor, int maxQueueLength, long maxHeapMemory, long maxDirectMemory) { _factor = factor <= 0 ? 1024 : factor; _maxQueueLength = maxQueueLength; - _maxHeapMemory = maxHeapMemory; - _maxDirectMemory = maxDirectMemory; + _maxHeapMemory = (maxHeapMemory != 0) ? maxHeapMemory : Runtime.getRuntime().maxMemory() / 4; + _maxDirectMemory = (maxDirectMemory != 0) ? maxDirectMemory : Runtime.getRuntime().maxMemory() / 4; } protected int getCapacityFactor() @@ -54,11 +63,13 @@ abstract class AbstractByteBufferPool implements ByteBufferPool return _maxQueueLength; } + @Deprecated protected void decrementMemory(ByteBuffer buffer) { updateMemory(buffer, false); } + @Deprecated protected void incrementMemory(ByteBuffer buffer) { updateMemory(buffer, true); @@ -95,12 +106,29 @@ abstract class AbstractByteBufferPool implements ByteBufferPool return getMemory(false); } + @ManagedAttribute("The max num of bytes that can be retained from direct ByteBuffers") + public long getMaxDirectMemory() + { + return _maxDirectMemory; + } + + @ManagedAttribute("The max num of bytes that can be retained from heap ByteBuffers") + public long getMaxHeapMemory() + { + return _maxHeapMemory; + } + public long getMemory(boolean direct) { AtomicLong memory = direct ? _directMemory : _heapMemory; return memory.get(); } + IntConsumer updateMemory(boolean direct) + { + return (direct) ? _directMemory::addAndGet : _heapMemory::addAndGet; + } + @ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION") public void clear() { diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java index 66d036e7e21..5d708d2f9e7 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java @@ -18,14 +18,19 @@ package org.eclipse.jetty.io; +import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.Objects; -import java.util.function.IntFunction; +import java.util.stream.Collectors; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.component.Dumpable; +import org.eclipse.jetty.util.component.DumpableCollection; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -36,13 +41,15 @@ import org.eclipse.jetty.util.log.Logger; * 2048, and so on.

*/ @ManagedObject -public class ArrayByteBufferPool extends AbstractByteBufferPool +public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpable { private static final Logger LOG = Log.getLogger(MappedByteBufferPool.class); + private final int _maxCapacity; private final int _minCapacity; private final ByteBufferPool.Bucket[] _direct; private final ByteBufferPool.Bucket[] _indirect; + private boolean _detailedDump = false; /** * Creates a new ArrayByteBufferPool with a default configuration. @@ -61,7 +68,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool */ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity) { - this(minCapacity, factor, maxCapacity, -1, -1, -1); + this(minCapacity, factor, maxCapacity, -1, 0, 0); } /** @@ -74,7 +81,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool */ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength) { - this(minCapacity, factor, maxCapacity, maxQueueLength, -1, -1); + this(minCapacity, factor, maxCapacity, maxQueueLength, 0, 0); } /** @@ -84,8 +91,8 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool * @param factor the capacity factor * @param maxCapacity the maximum ByteBuffer capacity * @param maxQueueLength the maximum ByteBuffer queue length - * @param maxHeapMemory the max heap memory in bytes - * @param maxDirectMemory the max direct memory in bytes + * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic. + * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic. */ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory) { @@ -98,24 +105,30 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool maxCapacity = 64 * 1024; if ((maxCapacity % factor) != 0 || factor >= maxCapacity) throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity"); + _maxCapacity = maxCapacity; _minCapacity = minCapacity; - int length = maxCapacity / factor; + // Initialize all buckets in constructor and never modify the array again. + int length = bucketFor(maxCapacity); _direct = new ByteBufferPool.Bucket[length]; _indirect = new ByteBufferPool.Bucket[length]; + for (int i = 0; i < length; i++) + { + _direct[i] = newBucket(i + 1, true); + _indirect[i] = newBucket(i + 1, false); + } } @Override public ByteBuffer acquire(int size, boolean direct) { - int capacity = size < _minCapacity ? size : (bucketFor(size) + 1) * getCapacityFactor(); - ByteBufferPool.Bucket bucket = bucketFor(size, direct, null); + int capacity = size < _minCapacity ? size : capacityFor(bucketFor(size)); + ByteBufferPool.Bucket bucket = bucketFor(size, direct); if (bucket == null) return newByteBuffer(capacity, direct); ByteBuffer buffer = bucket.acquire(); if (buffer == null) return newByteBuffer(capacity, direct); - decrementMemory(buffer); return buffer; } @@ -127,26 +140,29 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool int capacity = buffer.capacity(); // Validate that this buffer is from this pool. - if ((capacity % getCapacityFactor()) != 0) + if (capacity != capacityFor(bucketFor(capacity))) { if (LOG.isDebugEnabled()) LOG.debug("ByteBuffer {} does not belong to this pool, discarding it", BufferUtil.toDetailString(buffer)); return; } + // Don't release into the pool if greater than the maximum ByteBuffer capacity. + if (capacity > _maxCapacity) + return; + boolean direct = buffer.isDirect(); - ByteBufferPool.Bucket bucket = bucketFor(capacity, direct, this::newBucket); + ByteBufferPool.Bucket bucket = bucketFor(capacity, direct); if (bucket != null) { bucket.release(buffer); - incrementMemory(buffer); - releaseExcessMemory(direct, this::clearOldestBucket); + releaseExcessMemory(direct, this::releaseMemory); } } - private Bucket newBucket(int key) + private Bucket newBucket(int key, boolean direct) { - return new Bucket(this, key * getCapacityFactor(), getMaxQueueLength()); + return new Bucket(this, capacityFor(key), getMaxQueueLength(), updateMemory(direct)); } @Override @@ -155,18 +171,12 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool super.clear(); for (int i = 0; i < _direct.length; ++i) { - Bucket bucket = _direct[i]; - if (bucket != null) - bucket.clear(); - _direct[i] = null; - bucket = _indirect[i]; - if (bucket != null) - bucket.clear(); - _indirect[i] = null; + _direct[i].clear(); + _indirect[i].clear(); } } - private void clearOldestBucket(boolean direct) + protected void releaseMemory(boolean direct) { long oldest = Long.MAX_VALUE; int index = -1; @@ -174,7 +184,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool for (int i = 0; i < buckets.length; ++i) { Bucket bucket = buckets[i]; - if (bucket == null) + if (bucket.isEmpty()) continue; long lastUpdate = bucket.getLastUpdate(); if (lastUpdate < oldest) @@ -186,31 +196,29 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool if (index >= 0) { Bucket bucket = buckets[index]; - buckets[index] = null; - // The same bucket may be concurrently - // removed, so we need this null guard. - if (bucket != null) - bucket.clear(this::decrementMemory); + bucket.clear(); } } - private int bucketFor(int capacity) + protected int bucketFor(int capacity) { - return (capacity - 1) / getCapacityFactor(); + return (int)Math.ceil((double)capacity / getCapacityFactor()); } - private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct, IntFunction newBucket) + protected int capacityFor(int bucket) + { + return bucket * getCapacityFactor(); + } + + private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct) { if (capacity < _minCapacity) return null; - int b = bucketFor(capacity); - if (b >= _direct.length) + int index = bucketFor(capacity) - 1; + if (index >= _direct.length) return null; Bucket[] buckets = bucketsFor(direct); - Bucket bucket = buckets[b]; - if (bucket == null && newBucket != null) - buckets[b] = bucket = newBucket.apply(b + 1); - return bucket; + return buckets[index]; } @ManagedAttribute("The number of pooled direct ByteBuffers") @@ -238,4 +246,47 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool { return direct ? _direct : _indirect; } + + public boolean isDetailedDump() + { + return _detailedDump; + } + + public void setDetailedDump(boolean detailedDump) + { + _detailedDump = detailedDump; + } + + @Override + public void dump(Appendable out, String indent) throws IOException + { + List dump = new ArrayList<>(); + dump.add(String.format("HeapMemory: %d/%d", getHeapMemory(), getMaxHeapMemory())); + dump.add(String.format("DirectMemory: %d/%d", getDirectMemory(), getMaxDirectMemory())); + + List indirect = Arrays.stream(_indirect).filter(b -> !b.isEmpty()).collect(Collectors.toList()); + List direct = Arrays.stream(_direct).filter(b -> !b.isEmpty()).collect(Collectors.toList()); + if (isDetailedDump()) + { + dump.add(new DumpableCollection("Indirect Buckets", indirect)); + dump.add(new DumpableCollection("Direct Buckets", direct)); + } + else + { + dump.add("Indirect Buckets size=" + indirect.size()); + dump.add("Direct Buckets size=" + direct.size()); + } + Dumpable.dumpObjects(out, indent, this, dump); + } + + @Override + public String toString() + { + return String.format("%s@%x{minBufferCapacity=%s, maxBufferCapacity=%s, maxQueueLength=%s, factor=%s}", + this.getClass().getSimpleName(), hashCode(), + _minCapacity, + _maxCapacity, + getMaxQueueLength(), + getCapacityFactor()); + } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java index 6250226c23b..d99a8e3cbaf 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java @@ -21,11 +21,12 @@ package org.eclipse.jetty.io; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; +import java.util.Objects; import java.util.Queue; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; -import java.util.function.Consumer; +import java.util.function.IntConsumer; import org.eclipse.jetty.util.BufferUtil; @@ -160,22 +161,33 @@ public interface ByteBufferPool private final int _maxSize; private final AtomicInteger _size; private final AtomicLong _lastUpdate = new AtomicLong(System.nanoTime()); + private final IntConsumer _memoryFunction; + @Deprecated public Bucket(ByteBufferPool pool, int capacity, int maxSize) + { + this(pool, capacity, maxSize, i -> {}); + } + + public Bucket(ByteBufferPool pool, int capacity, int maxSize, IntConsumer memoryFunction) { _pool = pool; _capacity = capacity; _maxSize = maxSize; _size = maxSize > 0 ? new AtomicInteger() : null; + _memoryFunction = Objects.requireNonNull(memoryFunction); } public ByteBuffer acquire() { - ByteBuffer buffer = queuePoll(); - if (buffer == null) - return null; - if (_size != null) - _size.decrementAndGet(); + ByteBuffer buffer = _queue.poll(); + if (buffer != null) + { + if (_size != null) + _size.decrementAndGet(); + _memoryFunction.accept(-buffer.capacity()); + } + return buffer; } @@ -187,59 +199,45 @@ public interface ByteBufferPool @Deprecated public ByteBuffer acquire(boolean direct) { - ByteBuffer buffer = queuePoll(); + ByteBuffer buffer = acquire(); if (buffer == null) return _pool.newByteBuffer(_capacity, direct); - if (_size != null) - _size.decrementAndGet(); return buffer; } public void release(ByteBuffer buffer) { - _lastUpdate.lazySet(System.nanoTime()); + resetUpdateTime(); BufferUtil.clear(buffer); - if (_size == null) - queueOffer(buffer); - else if (_size.incrementAndGet() <= _maxSize) - queueOffer(buffer); + if (_size == null || _size.incrementAndGet() <= _maxSize) + { + _queue.offer(buffer); + _memoryFunction.accept(buffer.capacity()); + } else + { _size.decrementAndGet(); + } + } + + void resetUpdateTime() + { + _lastUpdate.lazySet(System.nanoTime()); } public void clear() - { - clear(null); - } - - void clear(Consumer memoryFn) { int size = _size == null ? 0 : _size.get() - 1; while (size >= 0) { - ByteBuffer buffer = queuePoll(); + ByteBuffer buffer = acquire(); if (buffer == null) break; - if (memoryFn != null) - memoryFn.accept(buffer); if (_size != null) - { - _size.decrementAndGet(); --size; - } } } - private void queueOffer(ByteBuffer buffer) - { - _queue.offer(buffer); - } - - private ByteBuffer queuePoll() - { - return _queue.poll(); - } - boolean isEmpty() { return _queue.isEmpty(); @@ -258,7 +256,7 @@ public interface ByteBufferPool @Override public String toString() { - return String.format("%s@%x{%d/%d@%d}", getClass().getSimpleName(), hashCode(), size(), _maxSize, _capacity); + return String.format("%s@%x{capacity=%d, size=%d, maxSize=%d}", getClass().getSimpleName(), hashCode(), _capacity, size(), _maxSize); } } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/LogarithmicArrayByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/LogarithmicArrayByteBufferPool.java new file mode 100644 index 00000000000..434c51d44f0 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/LogarithmicArrayByteBufferPool.java @@ -0,0 +1,112 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.io; + +/** + * Extension of the {@link ArrayByteBufferPool} whose bucket sizes increase exponentially instead of linearly. + * Each bucket will be double the size of the previous bucket, this decreases the amounts of buckets required + * which can lower total memory usage if buffers are often being acquired of different sizes. However as there are + * fewer buckets this will also increase the contention on each bucket. + */ +public class LogarithmicArrayByteBufferPool extends ArrayByteBufferPool +{ + /** + * Creates a new ByteBufferPool with a default configuration. + */ + public LogarithmicArrayByteBufferPool() + { + this(-1, -1, -1); + } + + /** + * Creates a new ByteBufferPool with the given configuration. + * + * @param minCapacity the minimum ByteBuffer capacity + * @param maxCapacity the maximum ByteBuffer capacity + */ + public LogarithmicArrayByteBufferPool(int minCapacity, int maxCapacity) + { + this(minCapacity, maxCapacity, -1, -1, -1); + } + + /** + * Creates a new ByteBufferPool with the given configuration. + * + * @param minCapacity the minimum ByteBuffer capacity + * @param maxCapacity the maximum ByteBuffer capacity + * @param maxQueueLength the maximum ByteBuffer queue length + */ + public LogarithmicArrayByteBufferPool(int minCapacity, int maxCapacity, int maxQueueLength) + { + this(minCapacity, maxCapacity, maxQueueLength, -1, -1); + } + + /** + * Creates a new ByteBufferPool with the given configuration. + * + * @param minCapacity the minimum ByteBuffer capacity + * @param maxCapacity the maximum ByteBuffer capacity + * @param maxQueueLength the maximum ByteBuffer queue length + * @param maxHeapMemory the max heap memory in bytes + * @param maxDirectMemory the max direct memory in bytes + */ + public LogarithmicArrayByteBufferPool(int minCapacity, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory) + { + super(minCapacity, 1, maxCapacity, maxQueueLength, maxHeapMemory, maxDirectMemory); + } + + @Override + protected int bucketFor(int capacity) + { + return 32 - Integer.numberOfLeadingZeros(capacity - 1); + } + + @Override + protected int capacityFor(int bucket) + { + return 1 << bucket; + } + + @Override + protected void releaseMemory(boolean direct) + { + long oldest = Long.MAX_VALUE; + int index = -1; + Bucket[] buckets = bucketsFor(direct); + for (int i = 0; i < buckets.length; ++i) + { + Bucket bucket = buckets[i]; + if (bucket.isEmpty()) + continue; + long lastUpdate = bucket.getLastUpdate(); + if (lastUpdate < oldest) + { + oldest = lastUpdate; + index = i; + } + } + if (index >= 0) + { + Bucket bucket = buckets[index]; + // Acquire a buffer but never return it to the pool. + bucket.acquire(); + bucket.resetUpdateTime(); + } + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java index 493b15e3bcb..567a552516f 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java @@ -18,7 +18,10 @@ package org.eclipse.jetty.io; +import java.io.IOException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -28,6 +31,8 @@ import java.util.function.Function; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.component.Dumpable; +import org.eclipse.jetty.util.component.DumpableCollection; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -38,13 +43,14 @@ import org.eclipse.jetty.util.log.Logger; * queue of ByteBuffers each of capacity 2048, and so on.

*/ @ManagedObject -public class MappedByteBufferPool extends AbstractByteBufferPool +public class MappedByteBufferPool extends AbstractByteBufferPool implements Dumpable { private static final Logger LOG = Log.getLogger(MappedByteBufferPool.class); private final ConcurrentMap _directBuffers = new ConcurrentHashMap<>(); private final ConcurrentMap _heapBuffers = new ConcurrentHashMap<>(); private final Function _newBucket; + private boolean _detailedDump = false; /** * Creates a new MappedByteBufferPool with a default configuration. @@ -84,7 +90,7 @@ public class MappedByteBufferPool extends AbstractByteBufferPool */ public MappedByteBufferPool(int factor, int maxQueueLength, Function newBucket) { - this(factor, maxQueueLength, newBucket, -1, -1); + this(factor, maxQueueLength, newBucket, 0, 0); } /** @@ -93,25 +99,25 @@ public class MappedByteBufferPool extends AbstractByteBufferPool * @param factor the capacity factor * @param maxQueueLength the maximum ByteBuffer queue length * @param newBucket the function that creates a Bucket - * @param maxHeapMemory the max heap memory in bytes - * @param maxDirectMemory the max direct memory in bytes + * @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic. + * @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic. */ public MappedByteBufferPool(int factor, int maxQueueLength, Function newBucket, long maxHeapMemory, long maxDirectMemory) { super(factor, maxQueueLength, maxHeapMemory, maxDirectMemory); - _newBucket = newBucket != null ? newBucket : this::newBucket; + _newBucket = newBucket; } - private Bucket newBucket(int key) + private Bucket newBucket(int key, boolean direct) { - return new Bucket(this, key * getCapacityFactor(), getMaxQueueLength()); + return (_newBucket != null) ? _newBucket.apply(key) : new Bucket(this, capacityFor(key), getMaxQueueLength(), updateMemory(direct)); } @Override public ByteBuffer acquire(int size, boolean direct) { int b = bucketFor(size); - int capacity = b * getCapacityFactor(); + int capacity = capacityFor(b); ConcurrentMap buffers = bucketsFor(direct); Bucket bucket = buffers.get(b); if (bucket == null) @@ -119,7 +125,6 @@ public class MappedByteBufferPool extends AbstractByteBufferPool ByteBuffer buffer = bucket.acquire(); if (buffer == null) return newByteBuffer(capacity, direct); - decrementMemory(buffer); return buffer; } @@ -130,21 +135,20 @@ public class MappedByteBufferPool extends AbstractByteBufferPool return; // nothing to do int capacity = buffer.capacity(); + int b = bucketFor(capacity); // Validate that this buffer is from this pool. - if ((capacity % getCapacityFactor()) != 0) + if (capacity != capacityFor(b)) { if (LOG.isDebugEnabled()) LOG.debug("ByteBuffer {} does not belong to this pool, discarding it", BufferUtil.toDetailString(buffer)); return; } - int b = bucketFor(capacity); boolean direct = buffer.isDirect(); ConcurrentMap buckets = bucketsFor(direct); - Bucket bucket = buckets.computeIfAbsent(b, _newBucket); + Bucket bucket = buckets.computeIfAbsent(b, i -> newBucket(i, direct)); bucket.release(buffer); - incrementMemory(buffer); - releaseExcessMemory(direct, this::clearOldestBucket); + releaseExcessMemory(direct, this::releaseMemory); } @Override @@ -157,7 +161,7 @@ public class MappedByteBufferPool extends AbstractByteBufferPool _heapBuffers.clear(); } - private void clearOldestBucket(boolean direct) + protected void releaseMemory(boolean direct) { long oldest = Long.MAX_VALUE; int index = -1; @@ -165,6 +169,9 @@ public class MappedByteBufferPool extends AbstractByteBufferPool for (Map.Entry entry : buckets.entrySet()) { Bucket bucket = entry.getValue(); + if (bucket.isEmpty()) + continue; + long lastUpdate = bucket.getLastUpdate(); if (lastUpdate < oldest) { @@ -174,21 +181,21 @@ public class MappedByteBufferPool extends AbstractByteBufferPool } if (index >= 0) { - Bucket bucket = buckets.remove(index); - // The same bucket may be concurrently - // removed, so we need this null guard. + Bucket bucket = buckets.get(index); + // Null guard in case this.clear() is called concurrently. if (bucket != null) - bucket.clear(this::decrementMemory); + bucket.clear(); } } - private int bucketFor(int size) + protected int bucketFor(int capacity) { - int factor = getCapacityFactor(); - int bucket = size / factor; - if (bucket * factor != size) - ++bucket; - return bucket; + return (int)Math.ceil((double)capacity / getCapacityFactor()); + } + + protected int capacityFor(int bucket) + { + return bucket * getCapacityFactor(); } @ManagedAttribute("The number of pooled direct ByteBuffers") @@ -231,4 +238,43 @@ public class MappedByteBufferPool extends AbstractByteBufferPool return slice; } } + + public boolean isDetailedDump() + { + return _detailedDump; + } + + public void setDetailedDump(boolean detailedDump) + { + _detailedDump = detailedDump; + } + + @Override + public void dump(Appendable out, String indent) throws IOException + { + List dump = new ArrayList<>(); + dump.add(String.format("HeapMemory: %d/%d", getHeapMemory(), getMaxHeapMemory())); + dump.add(String.format("DirectMemory: %d/%d", getDirectMemory(), getMaxDirectMemory())); + + if (isDetailedDump()) + { + dump.add(new DumpableCollection("Indirect Buckets", _heapBuffers.values())); + dump.add(new DumpableCollection("Direct Buckets", _directBuffers.values())); + } + else + { + dump.add("Indirect Buckets size=" + _heapBuffers.size()); + dump.add("Direct Buckets size=" + _directBuffers.size()); + } + Dumpable.dumpObjects(out, indent, this, dump); + } + + @Override + public String toString() + { + return String.format("%s@%x{maxQueueLength=%s, factor=%s}", + this.getClass().getSimpleName(), hashCode(), + getMaxQueueLength(), + getCapacityFactor()); + } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java index 705f29e4086..8089fea5b3b 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java @@ -28,11 +28,11 @@ import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.greaterThanOrEqualTo; +import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotSame; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -69,10 +69,13 @@ public class ArrayByteBufferPoolTest @Test public void testMaxRelease() { - ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10, 100, 1000); + int minCapacity = 10; + int factor = 1; + int maxCapacity = 1024; + ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(minCapacity, factor, maxCapacity); ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true); - for (int size = 999; size <= 1001; size++) + for (int size = maxCapacity - 1; size <= maxCapacity + 1; size++) { bufferPool.clear(); ByteBuffer buffer = bufferPool.acquire(size, true); @@ -91,7 +94,11 @@ public class ArrayByteBufferPoolTest .filter(Objects::nonNull) .mapToInt(Bucket::size) .sum(); - assertEquals(size <= 1000, 1 == pooled); + + if (size <= maxCapacity) + assertThat(pooled, is(1)); + else + assertThat(pooled, is(0)); } } @@ -215,7 +222,7 @@ public class ArrayByteBufferPoolTest // Now the oldest buffer should be gone and we have: 1+2x2+3=8 long memory = bufferPool.getMemory(true); assertThat(memory, lessThan((long)maxMemory)); - assertNull(buckets[3]); + assertTrue(buckets[3].isEmpty()); // Create and release a large buffer. // Max memory is exceeded and buckets 3 and 1 are cleared. @@ -224,7 +231,7 @@ public class ArrayByteBufferPoolTest bufferPool.release(buffer); memory = bufferPool.getMemory(true); assertThat(memory, lessThanOrEqualTo((long)maxMemory)); - assertNull(buckets[0]); - assertNull(buckets[2]); + assertTrue(buckets[0].isEmpty()); + assertTrue(buckets[2].isEmpty()); } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java index 27ec5eb2899..f6e6a110acf 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java @@ -32,7 +32,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -164,7 +163,7 @@ public class MappedByteBufferPoolTest // Now the oldest buffer should be gone and we have: 1+2x2+3=8 long memory = bufferPool.getMemory(true); assertThat(memory, lessThan((long)maxMemory)); - assertNull(buckets.get(4)); + assertTrue(buckets.get(4).isEmpty()); // Create and release a large buffer. // Max memory is exceeded and buckets 3 and 1 are cleared. @@ -173,7 +172,7 @@ public class MappedByteBufferPoolTest bufferPool.release(buffer); memory = bufferPool.getMemory(true); assertThat(memory, lessThanOrEqualTo((long)maxMemory)); - assertNull(buckets.get(1)); - assertNull(buckets.get(3)); + assertTrue(buckets.get(1).isEmpty()); + assertTrue(buckets.get(3).isEmpty()); } } diff --git a/jetty-server/src/main/config/etc/jetty-bytebufferpool-logarithmic.xml b/jetty-server/src/main/config/etc/jetty-bytebufferpool-logarithmic.xml new file mode 100644 index 00000000000..eff387d7f6c --- /dev/null +++ b/jetty-server/src/main/config/etc/jetty-bytebufferpool-logarithmic.xml @@ -0,0 +1,10 @@ + + + + + + + + + + diff --git a/jetty-server/src/main/config/etc/jetty-bytebufferpool.xml b/jetty-server/src/main/config/etc/jetty-bytebufferpool.xml index ab8f4a35065..610aabe3068 100644 --- a/jetty-server/src/main/config/etc/jetty-bytebufferpool.xml +++ b/jetty-server/src/main/config/etc/jetty-bytebufferpool.xml @@ -5,7 +5,7 @@ - - + + diff --git a/jetty-server/src/main/config/modules/bytebufferpool-logarithmic.mod b/jetty-server/src/main/config/modules/bytebufferpool-logarithmic.mod new file mode 100644 index 00000000000..77f816c9bbd --- /dev/null +++ b/jetty-server/src/main/config/modules/bytebufferpool-logarithmic.mod @@ -0,0 +1,30 @@ +# DO NOT EDIT - See: https://www.eclipse.org/jetty/documentation/current/startup-modules.html + +[description] +Configures the ByteBufferPool used by ServerConnectors whose bucket sizes increase exponentially instead of linearly. + +[tags] +bytebufferpool + +[provides] +bytebufferpool + +[xml] +etc/jetty-bytebufferpool-logarithmic.xml + +[ini-template] +### Server ByteBufferPool Configuration +## Minimum capacity to pool ByteBuffers +#jetty.byteBufferPool.minCapacity=0 + +## Maximum capacity to pool ByteBuffers +#jetty.byteBufferPool.maxCapacity=65536 + +## Maximum queue length for each bucket (-1 for unbounded) +#jetty.byteBufferPool.maxQueueLength=-1 + +## Maximum heap memory retainable by the pool (0 for heuristic, -1 for unlimited) +#jetty.byteBufferPool.maxHeapMemory=0 + +## Maximum direct memory retainable by the pool (0 for heuristic, -1 for unlimited) +#jetty.byteBufferPool.maxDirectMemory=0 diff --git a/jetty-server/src/main/config/modules/bytebufferpool.mod b/jetty-server/src/main/config/modules/bytebufferpool.mod index 8d31ececa97..7106f583354 100644 --- a/jetty-server/src/main/config/modules/bytebufferpool.mod +++ b/jetty-server/src/main/config/modules/bytebufferpool.mod @@ -3,6 +3,9 @@ [description] Configures the ByteBufferPool used by ServerConnectors. +[tags] +bytebufferpool + [xml] etc/jetty-bytebufferpool.xml @@ -20,8 +23,8 @@ etc/jetty-bytebufferpool.xml ## Maximum queue length for each bucket (-1 for unbounded) #jetty.byteBufferPool.maxQueueLength=-1 -## Maximum heap memory retainable by the pool (-1 for unlimited) -#jetty.byteBufferPool.maxHeapMemory=-1 +## Maximum heap memory retainable by the pool (0 for heuristic, -1 for unlimited) +#jetty.byteBufferPool.maxHeapMemory=0 -## Maximum direct memory retainable by the pool (-1 for unlimited) -#jetty.byteBufferPool.maxDirectMemory=-1 +## Maximum direct memory retainable by the pool (0 for heuristic, -1 for unlimited) +#jetty.byteBufferPool.maxDirectMemory=0 diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketBufferPoolTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketBufferPoolTest.java new file mode 100644 index 00000000000..f97b5a652fb --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/WebSocketBufferPoolTest.java @@ -0,0 +1,248 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.io.ArrayByteBufferPool; +import org.eclipse.jetty.io.LogarithmicArrayByteBufferPool; +import org.eclipse.jetty.io.NullByteBufferPool; +import org.eclipse.jetty.jmx.MBeanContainer; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.client.ClientUpgradeRequest; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.server.NativeWebSocketConfiguration; +import org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer; +import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.lessThanOrEqualTo; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class WebSocketBufferPoolTest +{ + private static final Logger LOG = Log.getLogger(WebSocketBufferPoolTest.class); + + private static final char[] ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789{}\":;<>,.()[]".toCharArray(); + private static final AtomicReference _latchReference = new AtomicReference<>(); + private Server _server; + private ArrayByteBufferPool _bufferPool; + private HttpClient _httpClient; + private WebSocketClient _websocketClient; + + @WebSocket + public static class ServerSocket + { + @OnWebSocketMessage + public void onMessage(Session session, String message) throws InterruptedException + { + CountDownLatch latch = _latchReference.get(); + latch.countDown(); + assertTrue(latch.await(20, TimeUnit.SECONDS)); + session.close(1000, "success"); + } + } + + @WebSocket + public static class ClientSocket + { + private int code; + private String reason; + private final CountDownLatch closeLatch = new CountDownLatch(1); + + @OnWebSocketMessage + public void onMessage(Session session, String message) + { + if (LOG.isDebugEnabled()) + LOG.debug("MessageSize: {}", message.length()); + } + + @OnWebSocketError + public void onError(Throwable t) + { + t.printStackTrace(); + } + + @OnWebSocketClose + public void onClose(int code, String status) + { + this.code = code; + this.reason = status; + closeLatch.countDown(); + } + } + + public String randomString(int len) + { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < len; i++) + { + sb.append(ALPHABET[(int)(Math.random() * ALPHABET.length)]); + } + return sb.toString(); + } + + @BeforeEach + public void before() throws Exception + { + // Ensure the threadPool can handle more than 100 threads. + QueuedThreadPool threadPool = new QueuedThreadPool(200); + + _server = new Server(threadPool); + int maxMemory = 1024 * 1024 * 16; + _bufferPool = new LogarithmicArrayByteBufferPool(-1, -1, -1, maxMemory, maxMemory); + _bufferPool.setDetailedDump(true); + _server.addBean(_bufferPool); + + ServerConnector connector = new ServerConnector(_server); + connector.setPort(8080); + _server.addConnector(connector); + + ServletContextHandler contextHandler = new ServletContextHandler(); + WebSocketUpgradeFilter.configure(contextHandler); + NativeWebSocketServletContainerInitializer.configure(contextHandler, ((servletContext, configuration) -> + { + WebSocketPolicy policy = configuration.getPolicy(); + policy.setMaxTextMessageBufferSize(Integer.MAX_VALUE); + policy.setMaxTextMessageSize(Integer.MAX_VALUE); + configuration.addMapping("/websocket", ServerSocket.class); + })); + + contextHandler.addServlet(new ServletHolder(new HttpServlet() + { + @Override + protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException + { + CountDownLatch countDownLatch = _latchReference.get(); + if (countDownLatch != null) + assertThat(countDownLatch.getCount(), is(0L)); + + int numThreads = Integer.parseInt(req.getParameter("numThreads")); + _latchReference.compareAndSet(countDownLatch, new CountDownLatch(numThreads)); + } + }), "/setCount"); + + _server.setHandler(contextHandler); + _server.addBean(new MBeanContainer(ManagementFactory.getPlatformMBeanServer())); + _server.start(); + + _httpClient = new HttpClient(); + _httpClient.setByteBufferPool(new NullByteBufferPool()); + _websocketClient = new WebSocketClient(_httpClient); + _websocketClient.start(); + + // Check the bufferPool used for the server is now used in the websocket configuration. + NativeWebSocketConfiguration config = (NativeWebSocketConfiguration)contextHandler.getAttribute(NativeWebSocketConfiguration.class.getName()); + assertNotNull(config); + assertThat(config.getFactory().getBufferPool(), is(_bufferPool)); + } + + @AfterEach + public void after() throws Exception + { + _websocketClient.stop(); + _server.stop(); + } + + @Test + public void test() throws Exception + { + int numThreads = 100; + int maxMessageSize = 1024 * 64; + for (int msgSize = 1024; msgSize < maxMessageSize; msgSize += 1024) + { + ContentResponse get = _httpClient.GET("http://localhost:8080/setCount?numThreads=" + numThreads); + assertThat(get.getStatus(), is(200)); + + Callback.Completable completable = new Callback.Completable() + { + final AtomicInteger count = new AtomicInteger(numThreads); + + @Override + public void succeeded() + { + if (count.decrementAndGet() == 0) + super.succeeded(); + } + }; + + int messageSize = msgSize; + for (int i = 0; i < numThreads; i++) + { + new Thread(() -> + { + try + { + ClientSocket clientSocket = new ClientSocket(); + URI uri = URI.create("ws://localhost:8080/websocket"); + ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest(); + upgradeRequest.addExtensions("permessage-deflate"); + Session session = _websocketClient.connect(clientSocket, uri, upgradeRequest).get(5, TimeUnit.SECONDS); + assertTrue(session.getUpgradeResponse().getExtensions().stream().anyMatch(config -> config.getName().equals("permessage-deflate"))); + + session.getRemote().sendString(randomString(messageSize)); + assertTrue(clientSocket.closeLatch.await(20, TimeUnit.SECONDS)); + assertThat(clientSocket.code, is(1000)); + assertThat(clientSocket.reason, is("success")); + completable.complete(null); + } + catch (Throwable t) + { + completable.failed(t); + } + }).start(); + } + + completable.get(30, TimeUnit.SECONDS); + } + + assertThat(_bufferPool.getDirectMemory(), lessThanOrEqualTo(_bufferPool.getMaxDirectMemory())); + assertThat(_bufferPool.getHeapMemory(), lessThanOrEqualTo(_bufferPool.getMaxHeapMemory())); + } +} diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java index 0fb43e93450..52da40decd0 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java @@ -48,6 +48,7 @@ import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.util.DecoratedObjectFactory; import org.eclipse.jetty.util.DeprecationWarning; import org.eclipse.jetty.util.StringUtil; @@ -119,7 +120,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc public WebSocketServerFactory(ServletContext context) { - this(context, WebSocketPolicy.newServerPolicy(), new MappedByteBufferPool()); + this(context, WebSocketPolicy.newServerPolicy(), null); } public WebSocketServerFactory(ServletContext context, ByteBufferPool bufferPool) @@ -135,7 +136,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc */ public WebSocketServerFactory(ServletContext context, WebSocketPolicy policy) { - this(context, policy, new MappedByteBufferPool()); + this(context, policy, null); } public WebSocketServerFactory(ServletContext context, WebSocketPolicy policy, ByteBufferPool bufferPool) @@ -161,7 +162,6 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc this.defaultPolicy = policy; this.objectFactory = objectFactory; this.executor = executor; - this.bufferPool = bufferPool; this.creator = this; this.contextClassloader = Thread.currentThread().getContextClassLoader(); @@ -201,6 +201,21 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc this.handshakes.put(HandshakeRFC6455.VERSION, new HandshakeRFC6455()); this.sessionFactories.add(new WebSocketSessionFactory(this)); + if (bufferPool == null) + { + ContextHandler contextHandler = ServletContextHandler.getContextHandler(context); + if (contextHandler != null) + { + Server server = contextHandler.getServer(); + if (server != null) + bufferPool = server.getBean(ByteBufferPool.class); + } + if (bufferPool == null) + bufferPool = new MappedByteBufferPool(); + } + this.bufferPool = bufferPool; + addBean(bufferPool); + // Create supportedVersions List versions = new ArrayList<>(handshakes.keySet()); versions.sort(Collections.reverseOrder()); // newest first @@ -216,7 +231,6 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc supportedVersions = rv.toString(); addBean(scheduler); - addBean(bufferPool); addBean(sessionTracker); addBean(extensionFactory); listeners.add(this.sessionTracker);