Issue #6974 - improvements & fixes to ByteBufferPool implementations (#7017)

- WebSocket should user server ByteBufferPool if possible
- fix various bugs ByteBufferPool implementations
- add heuristic for maxHeapMemory and maxDirectMemory
- Add dump for ByteBufferPools
- add LogArrayByteBufferPool that does exponential scaling of bucket size.
- ByteBufferPools should default to use maxMemory heuristic
- Add module jetty-bytebufferpool-logarithmic

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Lachlan 2021-11-25 10:09:10 +11:00 committed by GitHub
parent 11c6abdf6f
commit f86a719bce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 671 additions and 125 deletions

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@ -32,16 +33,24 @@ abstract class AbstractByteBufferPool implements ByteBufferPool
private final int _factor;
private final int _maxQueueLength;
private final long _maxHeapMemory;
private final AtomicLong _heapMemory = new AtomicLong();
private final long _maxDirectMemory;
private final AtomicLong _heapMemory = new AtomicLong();
private final AtomicLong _directMemory = new AtomicLong();
/**
* Creates a new ByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
*/
protected AbstractByteBufferPool(int factor, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
{
_factor = factor <= 0 ? 1024 : factor;
_maxQueueLength = maxQueueLength;
_maxHeapMemory = maxHeapMemory;
_maxDirectMemory = maxDirectMemory;
_maxHeapMemory = (maxHeapMemory != 0) ? maxHeapMemory : Runtime.getRuntime().maxMemory() / 4;
_maxDirectMemory = (maxDirectMemory != 0) ? maxDirectMemory : Runtime.getRuntime().maxMemory() / 4;
}
protected int getCapacityFactor()
@ -54,11 +63,13 @@ abstract class AbstractByteBufferPool implements ByteBufferPool
return _maxQueueLength;
}
@Deprecated
protected void decrementMemory(ByteBuffer buffer)
{
updateMemory(buffer, false);
}
@Deprecated
protected void incrementMemory(ByteBuffer buffer)
{
updateMemory(buffer, true);
@ -95,12 +106,29 @@ abstract class AbstractByteBufferPool implements ByteBufferPool
return getMemory(false);
}
@ManagedAttribute("The max num of bytes that can be retained from direct ByteBuffers")
public long getMaxDirectMemory()
{
return _maxDirectMemory;
}
@ManagedAttribute("The max num of bytes that can be retained from heap ByteBuffers")
public long getMaxHeapMemory()
{
return _maxHeapMemory;
}
public long getMemory(boolean direct)
{
AtomicLong memory = direct ? _directMemory : _heapMemory;
return memory.get();
}
IntConsumer updateMemory(boolean direct)
{
return (direct) ? _directMemory::addAndGet : _heapMemory::addAndGet;
}
@ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION")
public void clear()
{

View File

@ -18,14 +18,19 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.function.IntFunction;
import java.util.stream.Collectors;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -36,13 +41,15 @@ import org.eclipse.jetty.util.log.Logger;
* 2048, and so on.</p>
*/
@ManagedObject
public class ArrayByteBufferPool extends AbstractByteBufferPool
public class ArrayByteBufferPool extends AbstractByteBufferPool implements Dumpable
{
private static final Logger LOG = Log.getLogger(MappedByteBufferPool.class);
private final int _maxCapacity;
private final int _minCapacity;
private final ByteBufferPool.Bucket[] _direct;
private final ByteBufferPool.Bucket[] _indirect;
private boolean _detailedDump = false;
/**
* Creates a new ArrayByteBufferPool with a default configuration.
@ -61,7 +68,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool
*/
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity)
{
this(minCapacity, factor, maxCapacity, -1, -1, -1);
this(minCapacity, factor, maxCapacity, -1, 0, 0);
}
/**
@ -74,7 +81,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool
*/
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength)
{
this(minCapacity, factor, maxCapacity, maxQueueLength, -1, -1);
this(minCapacity, factor, maxCapacity, maxQueueLength, 0, 0);
}
/**
@ -84,8 +91,8 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool
* @param factor the capacity factor
* @param maxCapacity the maximum ByteBuffer capacity
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxHeapMemory the max heap memory in bytes
* @param maxDirectMemory the max direct memory in bytes
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
*/
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
{
@ -98,24 +105,30 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool
maxCapacity = 64 * 1024;
if ((maxCapacity % factor) != 0 || factor >= maxCapacity)
throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity");
_maxCapacity = maxCapacity;
_minCapacity = minCapacity;
int length = maxCapacity / factor;
// Initialize all buckets in constructor and never modify the array again.
int length = bucketFor(maxCapacity);
_direct = new ByteBufferPool.Bucket[length];
_indirect = new ByteBufferPool.Bucket[length];
for (int i = 0; i < length; i++)
{
_direct[i] = newBucket(i + 1, true);
_indirect[i] = newBucket(i + 1, false);
}
}
@Override
public ByteBuffer acquire(int size, boolean direct)
{
int capacity = size < _minCapacity ? size : (bucketFor(size) + 1) * getCapacityFactor();
ByteBufferPool.Bucket bucket = bucketFor(size, direct, null);
int capacity = size < _minCapacity ? size : capacityFor(bucketFor(size));
ByteBufferPool.Bucket bucket = bucketFor(size, direct);
if (bucket == null)
return newByteBuffer(capacity, direct);
ByteBuffer buffer = bucket.acquire();
if (buffer == null)
return newByteBuffer(capacity, direct);
decrementMemory(buffer);
return buffer;
}
@ -127,26 +140,29 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool
int capacity = buffer.capacity();
// Validate that this buffer is from this pool.
if ((capacity % getCapacityFactor()) != 0)
if (capacity != capacityFor(bucketFor(capacity)))
{
if (LOG.isDebugEnabled())
LOG.debug("ByteBuffer {} does not belong to this pool, discarding it", BufferUtil.toDetailString(buffer));
return;
}
// Don't release into the pool if greater than the maximum ByteBuffer capacity.
if (capacity > _maxCapacity)
return;
boolean direct = buffer.isDirect();
ByteBufferPool.Bucket bucket = bucketFor(capacity, direct, this::newBucket);
ByteBufferPool.Bucket bucket = bucketFor(capacity, direct);
if (bucket != null)
{
bucket.release(buffer);
incrementMemory(buffer);
releaseExcessMemory(direct, this::clearOldestBucket);
releaseExcessMemory(direct, this::releaseMemory);
}
}
private Bucket newBucket(int key)
private Bucket newBucket(int key, boolean direct)
{
return new Bucket(this, key * getCapacityFactor(), getMaxQueueLength());
return new Bucket(this, capacityFor(key), getMaxQueueLength(), updateMemory(direct));
}
@Override
@ -155,18 +171,12 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool
super.clear();
for (int i = 0; i < _direct.length; ++i)
{
Bucket bucket = _direct[i];
if (bucket != null)
bucket.clear();
_direct[i] = null;
bucket = _indirect[i];
if (bucket != null)
bucket.clear();
_indirect[i] = null;
_direct[i].clear();
_indirect[i].clear();
}
}
private void clearOldestBucket(boolean direct)
protected void releaseMemory(boolean direct)
{
long oldest = Long.MAX_VALUE;
int index = -1;
@ -174,7 +184,7 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool
for (int i = 0; i < buckets.length; ++i)
{
Bucket bucket = buckets[i];
if (bucket == null)
if (bucket.isEmpty())
continue;
long lastUpdate = bucket.getLastUpdate();
if (lastUpdate < oldest)
@ -186,31 +196,29 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool
if (index >= 0)
{
Bucket bucket = buckets[index];
buckets[index] = null;
// The same bucket may be concurrently
// removed, so we need this null guard.
if (bucket != null)
bucket.clear(this::decrementMemory);
bucket.clear();
}
}
private int bucketFor(int capacity)
protected int bucketFor(int capacity)
{
return (capacity - 1) / getCapacityFactor();
return (int)Math.ceil((double)capacity / getCapacityFactor());
}
private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct, IntFunction<Bucket> newBucket)
protected int capacityFor(int bucket)
{
return bucket * getCapacityFactor();
}
private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct)
{
if (capacity < _minCapacity)
return null;
int b = bucketFor(capacity);
if (b >= _direct.length)
int index = bucketFor(capacity) - 1;
if (index >= _direct.length)
return null;
Bucket[] buckets = bucketsFor(direct);
Bucket bucket = buckets[b];
if (bucket == null && newBucket != null)
buckets[b] = bucket = newBucket.apply(b + 1);
return bucket;
return buckets[index];
}
@ManagedAttribute("The number of pooled direct ByteBuffers")
@ -238,4 +246,47 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool
{
return direct ? _direct : _indirect;
}
public boolean isDetailedDump()
{
return _detailedDump;
}
public void setDetailedDump(boolean detailedDump)
{
_detailedDump = detailedDump;
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
List<Object> dump = new ArrayList<>();
dump.add(String.format("HeapMemory: %d/%d", getHeapMemory(), getMaxHeapMemory()));
dump.add(String.format("DirectMemory: %d/%d", getDirectMemory(), getMaxDirectMemory()));
List<Bucket> indirect = Arrays.stream(_indirect).filter(b -> !b.isEmpty()).collect(Collectors.toList());
List<Bucket> direct = Arrays.stream(_direct).filter(b -> !b.isEmpty()).collect(Collectors.toList());
if (isDetailedDump())
{
dump.add(new DumpableCollection("Indirect Buckets", indirect));
dump.add(new DumpableCollection("Direct Buckets", direct));
}
else
{
dump.add("Indirect Buckets size=" + indirect.size());
dump.add("Direct Buckets size=" + direct.size());
}
Dumpable.dumpObjects(out, indent, this, dump);
}
@Override
public String toString()
{
return String.format("%s@%x{minBufferCapacity=%s, maxBufferCapacity=%s, maxQueueLength=%s, factor=%s}",
this.getClass().getSimpleName(), hashCode(),
_minCapacity,
_maxCapacity,
getMaxQueueLength(),
getCapacityFactor());
}
}

View File

@ -21,11 +21,12 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import java.util.function.IntConsumer;
import org.eclipse.jetty.util.BufferUtil;
@ -160,22 +161,33 @@ public interface ByteBufferPool
private final int _maxSize;
private final AtomicInteger _size;
private final AtomicLong _lastUpdate = new AtomicLong(System.nanoTime());
private final IntConsumer _memoryFunction;
@Deprecated
public Bucket(ByteBufferPool pool, int capacity, int maxSize)
{
this(pool, capacity, maxSize, i -> {});
}
public Bucket(ByteBufferPool pool, int capacity, int maxSize, IntConsumer memoryFunction)
{
_pool = pool;
_capacity = capacity;
_maxSize = maxSize;
_size = maxSize > 0 ? new AtomicInteger() : null;
_memoryFunction = Objects.requireNonNull(memoryFunction);
}
public ByteBuffer acquire()
{
ByteBuffer buffer = queuePoll();
if (buffer == null)
return null;
if (_size != null)
_size.decrementAndGet();
ByteBuffer buffer = _queue.poll();
if (buffer != null)
{
if (_size != null)
_size.decrementAndGet();
_memoryFunction.accept(-buffer.capacity());
}
return buffer;
}
@ -187,59 +199,45 @@ public interface ByteBufferPool
@Deprecated
public ByteBuffer acquire(boolean direct)
{
ByteBuffer buffer = queuePoll();
ByteBuffer buffer = acquire();
if (buffer == null)
return _pool.newByteBuffer(_capacity, direct);
if (_size != null)
_size.decrementAndGet();
return buffer;
}
public void release(ByteBuffer buffer)
{
_lastUpdate.lazySet(System.nanoTime());
resetUpdateTime();
BufferUtil.clear(buffer);
if (_size == null)
queueOffer(buffer);
else if (_size.incrementAndGet() <= _maxSize)
queueOffer(buffer);
if (_size == null || _size.incrementAndGet() <= _maxSize)
{
_queue.offer(buffer);
_memoryFunction.accept(buffer.capacity());
}
else
{
_size.decrementAndGet();
}
}
void resetUpdateTime()
{
_lastUpdate.lazySet(System.nanoTime());
}
public void clear()
{
clear(null);
}
void clear(Consumer<ByteBuffer> memoryFn)
{
int size = _size == null ? 0 : _size.get() - 1;
while (size >= 0)
{
ByteBuffer buffer = queuePoll();
ByteBuffer buffer = acquire();
if (buffer == null)
break;
if (memoryFn != null)
memoryFn.accept(buffer);
if (_size != null)
{
_size.decrementAndGet();
--size;
}
}
}
private void queueOffer(ByteBuffer buffer)
{
_queue.offer(buffer);
}
private ByteBuffer queuePoll()
{
return _queue.poll();
}
boolean isEmpty()
{
return _queue.isEmpty();
@ -258,7 +256,7 @@ public interface ByteBufferPool
@Override
public String toString()
{
return String.format("%s@%x{%d/%d@%d}", getClass().getSimpleName(), hashCode(), size(), _maxSize, _capacity);
return String.format("%s@%x{capacity=%d, size=%d, maxSize=%d}", getClass().getSimpleName(), hashCode(), _capacity, size(), _maxSize);
}
}
}

View File

@ -0,0 +1,112 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io;
/**
* Extension of the {@link ArrayByteBufferPool} whose bucket sizes increase exponentially instead of linearly.
* Each bucket will be double the size of the previous bucket, this decreases the amounts of buckets required
* which can lower total memory usage if buffers are often being acquired of different sizes. However as there are
* fewer buckets this will also increase the contention on each bucket.
*/
public class LogarithmicArrayByteBufferPool extends ArrayByteBufferPool
{
/**
* Creates a new ByteBufferPool with a default configuration.
*/
public LogarithmicArrayByteBufferPool()
{
this(-1, -1, -1);
}
/**
* Creates a new ByteBufferPool with the given configuration.
*
* @param minCapacity the minimum ByteBuffer capacity
* @param maxCapacity the maximum ByteBuffer capacity
*/
public LogarithmicArrayByteBufferPool(int minCapacity, int maxCapacity)
{
this(minCapacity, maxCapacity, -1, -1, -1);
}
/**
* Creates a new ByteBufferPool with the given configuration.
*
* @param minCapacity the minimum ByteBuffer capacity
* @param maxCapacity the maximum ByteBuffer capacity
* @param maxQueueLength the maximum ByteBuffer queue length
*/
public LogarithmicArrayByteBufferPool(int minCapacity, int maxCapacity, int maxQueueLength)
{
this(minCapacity, maxCapacity, maxQueueLength, -1, -1);
}
/**
* Creates a new ByteBufferPool with the given configuration.
*
* @param minCapacity the minimum ByteBuffer capacity
* @param maxCapacity the maximum ByteBuffer capacity
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxHeapMemory the max heap memory in bytes
* @param maxDirectMemory the max direct memory in bytes
*/
public LogarithmicArrayByteBufferPool(int minCapacity, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
{
super(minCapacity, 1, maxCapacity, maxQueueLength, maxHeapMemory, maxDirectMemory);
}
@Override
protected int bucketFor(int capacity)
{
return 32 - Integer.numberOfLeadingZeros(capacity - 1);
}
@Override
protected int capacityFor(int bucket)
{
return 1 << bucket;
}
@Override
protected void releaseMemory(boolean direct)
{
long oldest = Long.MAX_VALUE;
int index = -1;
Bucket[] buckets = bucketsFor(direct);
for (int i = 0; i < buckets.length; ++i)
{
Bucket bucket = buckets[i];
if (bucket.isEmpty())
continue;
long lastUpdate = bucket.getLastUpdate();
if (lastUpdate < oldest)
{
oldest = lastUpdate;
index = i;
}
}
if (index >= 0)
{
Bucket bucket = buckets[index];
// Acquire a buffer but never return it to the pool.
bucket.acquire();
bucket.resetUpdateTime();
}
}
}

View File

@ -18,7 +18,10 @@
package org.eclipse.jetty.io;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -28,6 +31,8 @@ import java.util.function.Function;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -38,13 +43,14 @@ import org.eclipse.jetty.util.log.Logger;
* queue of ByteBuffers each of capacity 2048, and so on.</p>
*/
@ManagedObject
public class MappedByteBufferPool extends AbstractByteBufferPool
public class MappedByteBufferPool extends AbstractByteBufferPool implements Dumpable
{
private static final Logger LOG = Log.getLogger(MappedByteBufferPool.class);
private final ConcurrentMap<Integer, Bucket> _directBuffers = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Bucket> _heapBuffers = new ConcurrentHashMap<>();
private final Function<Integer, Bucket> _newBucket;
private boolean _detailedDump = false;
/**
* Creates a new MappedByteBufferPool with a default configuration.
@ -84,7 +90,7 @@ public class MappedByteBufferPool extends AbstractByteBufferPool
*/
public MappedByteBufferPool(int factor, int maxQueueLength, Function<Integer, Bucket> newBucket)
{
this(factor, maxQueueLength, newBucket, -1, -1);
this(factor, maxQueueLength, newBucket, 0, 0);
}
/**
@ -93,25 +99,25 @@ public class MappedByteBufferPool extends AbstractByteBufferPool
* @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length
* @param newBucket the function that creates a Bucket
* @param maxHeapMemory the max heap memory in bytes
* @param maxDirectMemory the max direct memory in bytes
* @param maxHeapMemory the max heap memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
* @param maxDirectMemory the max direct memory in bytes, -1 for unlimited memory or 0 to use default heuristic.
*/
public MappedByteBufferPool(int factor, int maxQueueLength, Function<Integer, Bucket> newBucket, long maxHeapMemory, long maxDirectMemory)
{
super(factor, maxQueueLength, maxHeapMemory, maxDirectMemory);
_newBucket = newBucket != null ? newBucket : this::newBucket;
_newBucket = newBucket;
}
private Bucket newBucket(int key)
private Bucket newBucket(int key, boolean direct)
{
return new Bucket(this, key * getCapacityFactor(), getMaxQueueLength());
return (_newBucket != null) ? _newBucket.apply(key) : new Bucket(this, capacityFor(key), getMaxQueueLength(), updateMemory(direct));
}
@Override
public ByteBuffer acquire(int size, boolean direct)
{
int b = bucketFor(size);
int capacity = b * getCapacityFactor();
int capacity = capacityFor(b);
ConcurrentMap<Integer, Bucket> buffers = bucketsFor(direct);
Bucket bucket = buffers.get(b);
if (bucket == null)
@ -119,7 +125,6 @@ public class MappedByteBufferPool extends AbstractByteBufferPool
ByteBuffer buffer = bucket.acquire();
if (buffer == null)
return newByteBuffer(capacity, direct);
decrementMemory(buffer);
return buffer;
}
@ -130,21 +135,20 @@ public class MappedByteBufferPool extends AbstractByteBufferPool
return; // nothing to do
int capacity = buffer.capacity();
int b = bucketFor(capacity);
// Validate that this buffer is from this pool.
if ((capacity % getCapacityFactor()) != 0)
if (capacity != capacityFor(b))
{
if (LOG.isDebugEnabled())
LOG.debug("ByteBuffer {} does not belong to this pool, discarding it", BufferUtil.toDetailString(buffer));
return;
}
int b = bucketFor(capacity);
boolean direct = buffer.isDirect();
ConcurrentMap<Integer, Bucket> buckets = bucketsFor(direct);
Bucket bucket = buckets.computeIfAbsent(b, _newBucket);
Bucket bucket = buckets.computeIfAbsent(b, i -> newBucket(i, direct));
bucket.release(buffer);
incrementMemory(buffer);
releaseExcessMemory(direct, this::clearOldestBucket);
releaseExcessMemory(direct, this::releaseMemory);
}
@Override
@ -157,7 +161,7 @@ public class MappedByteBufferPool extends AbstractByteBufferPool
_heapBuffers.clear();
}
private void clearOldestBucket(boolean direct)
protected void releaseMemory(boolean direct)
{
long oldest = Long.MAX_VALUE;
int index = -1;
@ -165,6 +169,9 @@ public class MappedByteBufferPool extends AbstractByteBufferPool
for (Map.Entry<Integer, Bucket> entry : buckets.entrySet())
{
Bucket bucket = entry.getValue();
if (bucket.isEmpty())
continue;
long lastUpdate = bucket.getLastUpdate();
if (lastUpdate < oldest)
{
@ -174,21 +181,21 @@ public class MappedByteBufferPool extends AbstractByteBufferPool
}
if (index >= 0)
{
Bucket bucket = buckets.remove(index);
// The same bucket may be concurrently
// removed, so we need this null guard.
Bucket bucket = buckets.get(index);
// Null guard in case this.clear() is called concurrently.
if (bucket != null)
bucket.clear(this::decrementMemory);
bucket.clear();
}
}
private int bucketFor(int size)
protected int bucketFor(int capacity)
{
int factor = getCapacityFactor();
int bucket = size / factor;
if (bucket * factor != size)
++bucket;
return bucket;
return (int)Math.ceil((double)capacity / getCapacityFactor());
}
protected int capacityFor(int bucket)
{
return bucket * getCapacityFactor();
}
@ManagedAttribute("The number of pooled direct ByteBuffers")
@ -231,4 +238,43 @@ public class MappedByteBufferPool extends AbstractByteBufferPool
return slice;
}
}
public boolean isDetailedDump()
{
return _detailedDump;
}
public void setDetailedDump(boolean detailedDump)
{
_detailedDump = detailedDump;
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
List<Object> dump = new ArrayList<>();
dump.add(String.format("HeapMemory: %d/%d", getHeapMemory(), getMaxHeapMemory()));
dump.add(String.format("DirectMemory: %d/%d", getDirectMemory(), getMaxDirectMemory()));
if (isDetailedDump())
{
dump.add(new DumpableCollection("Indirect Buckets", _heapBuffers.values()));
dump.add(new DumpableCollection("Direct Buckets", _directBuffers.values()));
}
else
{
dump.add("Indirect Buckets size=" + _heapBuffers.size());
dump.add("Direct Buckets size=" + _directBuffers.size());
}
Dumpable.dumpObjects(out, indent, this, dump);
}
@Override
public String toString()
{
return String.format("%s@%x{maxQueueLength=%s, factor=%s}",
this.getClass().getSimpleName(), hashCode(),
getMaxQueueLength(),
getCapacityFactor());
}
}

View File

@ -28,11 +28,11 @@ import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -69,10 +69,13 @@ public class ArrayByteBufferPoolTest
@Test
public void testMaxRelease()
{
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(10, 100, 1000);
int minCapacity = 10;
int factor = 1;
int maxCapacity = 1024;
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(minCapacity, factor, maxCapacity);
ByteBufferPool.Bucket[] buckets = bufferPool.bucketsFor(true);
for (int size = 999; size <= 1001; size++)
for (int size = maxCapacity - 1; size <= maxCapacity + 1; size++)
{
bufferPool.clear();
ByteBuffer buffer = bufferPool.acquire(size, true);
@ -91,7 +94,11 @@ public class ArrayByteBufferPoolTest
.filter(Objects::nonNull)
.mapToInt(Bucket::size)
.sum();
assertEquals(size <= 1000, 1 == pooled);
if (size <= maxCapacity)
assertThat(pooled, is(1));
else
assertThat(pooled, is(0));
}
}
@ -215,7 +222,7 @@ public class ArrayByteBufferPoolTest
// Now the oldest buffer should be gone and we have: 1+2x2+3=8
long memory = bufferPool.getMemory(true);
assertThat(memory, lessThan((long)maxMemory));
assertNull(buckets[3]);
assertTrue(buckets[3].isEmpty());
// Create and release a large buffer.
// Max memory is exceeded and buckets 3 and 1 are cleared.
@ -224,7 +231,7 @@ public class ArrayByteBufferPoolTest
bufferPool.release(buffer);
memory = bufferPool.getMemory(true);
assertThat(memory, lessThanOrEqualTo((long)maxMemory));
assertNull(buckets[0]);
assertNull(buckets[2]);
assertTrue(buckets[0].isEmpty());
assertTrue(buckets[2].isEmpty());
}
}

View File

@ -32,7 +32,6 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -164,7 +163,7 @@ public class MappedByteBufferPoolTest
// Now the oldest buffer should be gone and we have: 1+2x2+3=8
long memory = bufferPool.getMemory(true);
assertThat(memory, lessThan((long)maxMemory));
assertNull(buckets.get(4));
assertTrue(buckets.get(4).isEmpty());
// Create and release a large buffer.
// Max memory is exceeded and buckets 3 and 1 are cleared.
@ -173,7 +172,7 @@ public class MappedByteBufferPoolTest
bufferPool.release(buffer);
memory = bufferPool.getMemory(true);
assertThat(memory, lessThanOrEqualTo((long)maxMemory));
assertNull(buckets.get(1));
assertNull(buckets.get(3));
assertTrue(buckets.get(1).isEmpty());
assertTrue(buckets.get(3).isEmpty());
}
}

View File

@ -0,0 +1,10 @@
<?xml version="1.0"?><!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
<Configure>
<New id="byteBufferPool" class="org.eclipse.jetty.io.LogarithmicArrayByteBufferPool">
<Arg type="int"><Property name="jetty.byteBufferPool.minCapacity" default="0"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.maxCapacity" default="65536"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.maxQueueLength" default="-1"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.maxHeapMemory" default="0"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.maxDirectMemory" default="0"/></Arg>
</New>
</Configure>

View File

@ -5,7 +5,7 @@
<Arg type="int"><Property name="jetty.byteBufferPool.factor" default="1024"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.maxCapacity" default="65536"/></Arg>
<Arg type="int"><Property name="jetty.byteBufferPool.maxQueueLength" default="-1"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.maxHeapMemory" default="-1"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.maxDirectMemory" default="-1"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.maxHeapMemory" default="0"/></Arg>
<Arg type="long"><Property name="jetty.byteBufferPool.maxDirectMemory" default="0"/></Arg>
</New>
</Configure>

View File

@ -0,0 +1,30 @@
# DO NOT EDIT - See: https://www.eclipse.org/jetty/documentation/current/startup-modules.html
[description]
Configures the ByteBufferPool used by ServerConnectors whose bucket sizes increase exponentially instead of linearly.
[tags]
bytebufferpool
[provides]
bytebufferpool
[xml]
etc/jetty-bytebufferpool-logarithmic.xml
[ini-template]
### Server ByteBufferPool Configuration
## Minimum capacity to pool ByteBuffers
#jetty.byteBufferPool.minCapacity=0
## Maximum capacity to pool ByteBuffers
#jetty.byteBufferPool.maxCapacity=65536
## Maximum queue length for each bucket (-1 for unbounded)
#jetty.byteBufferPool.maxQueueLength=-1
## Maximum heap memory retainable by the pool (0 for heuristic, -1 for unlimited)
#jetty.byteBufferPool.maxHeapMemory=0
## Maximum direct memory retainable by the pool (0 for heuristic, -1 for unlimited)
#jetty.byteBufferPool.maxDirectMemory=0

View File

@ -3,6 +3,9 @@
[description]
Configures the ByteBufferPool used by ServerConnectors.
[tags]
bytebufferpool
[xml]
etc/jetty-bytebufferpool.xml
@ -20,8 +23,8 @@ etc/jetty-bytebufferpool.xml
## Maximum queue length for each bucket (-1 for unbounded)
#jetty.byteBufferPool.maxQueueLength=-1
## Maximum heap memory retainable by the pool (-1 for unlimited)
#jetty.byteBufferPool.maxHeapMemory=-1
## Maximum heap memory retainable by the pool (0 for heuristic, -1 for unlimited)
#jetty.byteBufferPool.maxHeapMemory=0
## Maximum direct memory retainable by the pool (-1 for unlimited)
#jetty.byteBufferPool.maxDirectMemory=-1
## Maximum direct memory retainable by the pool (0 for heuristic, -1 for unlimited)
#jetty.byteBufferPool.maxDirectMemory=0

View File

@ -0,0 +1,248 @@
//
// ========================================================================
// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import java.io.IOException;
import java.lang.management.ManagementFactory;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.LogarithmicArrayByteBufferPool;
import org.eclipse.jetty.io.NullByteBufferPool;
import org.eclipse.jetty.jmx.MBeanContainer;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.NativeWebSocketConfiguration;
import org.eclipse.jetty.websocket.server.NativeWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.server.WebSocketUpgradeFilter;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class WebSocketBufferPoolTest
{
private static final Logger LOG = Log.getLogger(WebSocketBufferPoolTest.class);
private static final char[] ALPHABET = "ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789{}\":;<>,.()[]".toCharArray();
private static final AtomicReference<CountDownLatch> _latchReference = new AtomicReference<>();
private Server _server;
private ArrayByteBufferPool _bufferPool;
private HttpClient _httpClient;
private WebSocketClient _websocketClient;
@WebSocket
public static class ServerSocket
{
@OnWebSocketMessage
public void onMessage(Session session, String message) throws InterruptedException
{
CountDownLatch latch = _latchReference.get();
latch.countDown();
assertTrue(latch.await(20, TimeUnit.SECONDS));
session.close(1000, "success");
}
}
@WebSocket
public static class ClientSocket
{
private int code;
private String reason;
private final CountDownLatch closeLatch = new CountDownLatch(1);
@OnWebSocketMessage
public void onMessage(Session session, String message)
{
if (LOG.isDebugEnabled())
LOG.debug("MessageSize: {}", message.length());
}
@OnWebSocketError
public void onError(Throwable t)
{
t.printStackTrace();
}
@OnWebSocketClose
public void onClose(int code, String status)
{
this.code = code;
this.reason = status;
closeLatch.countDown();
}
}
public String randomString(int len)
{
StringBuilder sb = new StringBuilder();
for (int i = 0; i < len; i++)
{
sb.append(ALPHABET[(int)(Math.random() * ALPHABET.length)]);
}
return sb.toString();
}
@BeforeEach
public void before() throws Exception
{
// Ensure the threadPool can handle more than 100 threads.
QueuedThreadPool threadPool = new QueuedThreadPool(200);
_server = new Server(threadPool);
int maxMemory = 1024 * 1024 * 16;
_bufferPool = new LogarithmicArrayByteBufferPool(-1, -1, -1, maxMemory, maxMemory);
_bufferPool.setDetailedDump(true);
_server.addBean(_bufferPool);
ServerConnector connector = new ServerConnector(_server);
connector.setPort(8080);
_server.addConnector(connector);
ServletContextHandler contextHandler = new ServletContextHandler();
WebSocketUpgradeFilter.configure(contextHandler);
NativeWebSocketServletContainerInitializer.configure(contextHandler, ((servletContext, configuration) ->
{
WebSocketPolicy policy = configuration.getPolicy();
policy.setMaxTextMessageBufferSize(Integer.MAX_VALUE);
policy.setMaxTextMessageSize(Integer.MAX_VALUE);
configuration.addMapping("/websocket", ServerSocket.class);
}));
contextHandler.addServlet(new ServletHolder(new HttpServlet()
{
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException
{
CountDownLatch countDownLatch = _latchReference.get();
if (countDownLatch != null)
assertThat(countDownLatch.getCount(), is(0L));
int numThreads = Integer.parseInt(req.getParameter("numThreads"));
_latchReference.compareAndSet(countDownLatch, new CountDownLatch(numThreads));
}
}), "/setCount");
_server.setHandler(contextHandler);
_server.addBean(new MBeanContainer(ManagementFactory.getPlatformMBeanServer()));
_server.start();
_httpClient = new HttpClient();
_httpClient.setByteBufferPool(new NullByteBufferPool());
_websocketClient = new WebSocketClient(_httpClient);
_websocketClient.start();
// Check the bufferPool used for the server is now used in the websocket configuration.
NativeWebSocketConfiguration config = (NativeWebSocketConfiguration)contextHandler.getAttribute(NativeWebSocketConfiguration.class.getName());
assertNotNull(config);
assertThat(config.getFactory().getBufferPool(), is(_bufferPool));
}
@AfterEach
public void after() throws Exception
{
_websocketClient.stop();
_server.stop();
}
@Test
public void test() throws Exception
{
int numThreads = 100;
int maxMessageSize = 1024 * 64;
for (int msgSize = 1024; msgSize < maxMessageSize; msgSize += 1024)
{
ContentResponse get = _httpClient.GET("http://localhost:8080/setCount?numThreads=" + numThreads);
assertThat(get.getStatus(), is(200));
Callback.Completable completable = new Callback.Completable()
{
final AtomicInteger count = new AtomicInteger(numThreads);
@Override
public void succeeded()
{
if (count.decrementAndGet() == 0)
super.succeeded();
}
};
int messageSize = msgSize;
for (int i = 0; i < numThreads; i++)
{
new Thread(() ->
{
try
{
ClientSocket clientSocket = new ClientSocket();
URI uri = URI.create("ws://localhost:8080/websocket");
ClientUpgradeRequest upgradeRequest = new ClientUpgradeRequest();
upgradeRequest.addExtensions("permessage-deflate");
Session session = _websocketClient.connect(clientSocket, uri, upgradeRequest).get(5, TimeUnit.SECONDS);
assertTrue(session.getUpgradeResponse().getExtensions().stream().anyMatch(config -> config.getName().equals("permessage-deflate")));
session.getRemote().sendString(randomString(messageSize));
assertTrue(clientSocket.closeLatch.await(20, TimeUnit.SECONDS));
assertThat(clientSocket.code, is(1000));
assertThat(clientSocket.reason, is("success"));
completable.complete(null);
}
catch (Throwable t)
{
completable.failed(t);
}
}).start();
}
completable.get(30, TimeUnit.SECONDS);
}
assertThat(_bufferPool.getDirectMemory(), lessThanOrEqualTo(_bufferPool.getMaxDirectMemory()));
assertThat(_bufferPool.getHeapMemory(), lessThanOrEqualTo(_bufferPool.getMaxHeapMemory()));
}
}

View File

@ -48,6 +48,7 @@ import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.util.DecoratedObjectFactory;
import org.eclipse.jetty.util.DeprecationWarning;
import org.eclipse.jetty.util.StringUtil;
@ -119,7 +120,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
public WebSocketServerFactory(ServletContext context)
{
this(context, WebSocketPolicy.newServerPolicy(), new MappedByteBufferPool());
this(context, WebSocketPolicy.newServerPolicy(), null);
}
public WebSocketServerFactory(ServletContext context, ByteBufferPool bufferPool)
@ -135,7 +136,7 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
*/
public WebSocketServerFactory(ServletContext context, WebSocketPolicy policy)
{
this(context, policy, new MappedByteBufferPool());
this(context, policy, null);
}
public WebSocketServerFactory(ServletContext context, WebSocketPolicy policy, ByteBufferPool bufferPool)
@ -161,7 +162,6 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
this.defaultPolicy = policy;
this.objectFactory = objectFactory;
this.executor = executor;
this.bufferPool = bufferPool;
this.creator = this;
this.contextClassloader = Thread.currentThread().getContextClassLoader();
@ -201,6 +201,21 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
this.handshakes.put(HandshakeRFC6455.VERSION, new HandshakeRFC6455());
this.sessionFactories.add(new WebSocketSessionFactory(this));
if (bufferPool == null)
{
ContextHandler contextHandler = ServletContextHandler.getContextHandler(context);
if (contextHandler != null)
{
Server server = contextHandler.getServer();
if (server != null)
bufferPool = server.getBean(ByteBufferPool.class);
}
if (bufferPool == null)
bufferPool = new MappedByteBufferPool();
}
this.bufferPool = bufferPool;
addBean(bufferPool);
// Create supportedVersions
List<Integer> versions = new ArrayList<>(handshakes.keySet());
versions.sort(Collections.reverseOrder()); // newest first
@ -216,7 +231,6 @@ public class WebSocketServerFactory extends ContainerLifeCycle implements WebSoc
supportedVersions = rv.toString();
addBean(scheduler);
addBean(bufferPool);
addBean(sessionTracker);
addBean(extensionFactory);
listeners.add(this.sessionTracker);