Merge pull request #3391 from eclipse/jetty-9.4.x-1861-limit_bufferpool_memory
Issue #1861 - Limit total bytes pooled by ByteBufferPools.
This commit is contained in:
commit
6196ff19e9
|
@ -0,0 +1,108 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.io;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.annotation.ManagedOperation;
|
||||
|
||||
@ManagedObject
|
||||
abstract class AbstractByteBufferPool implements ByteBufferPool
|
||||
{
|
||||
private final int _factor;
|
||||
private final int _maxQueueLength;
|
||||
private final long _maxHeapMemory;
|
||||
private final AtomicLong _heapMemory = new AtomicLong();
|
||||
private final long _maxDirectMemory;
|
||||
private final AtomicLong _directMemory = new AtomicLong();
|
||||
|
||||
protected AbstractByteBufferPool(int factor, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
|
||||
{
|
||||
_factor = factor <= 0 ? 1024 : factor;
|
||||
_maxQueueLength = maxQueueLength;
|
||||
_maxHeapMemory = maxHeapMemory;
|
||||
_maxDirectMemory = maxDirectMemory;
|
||||
}
|
||||
|
||||
protected int getCapacityFactor()
|
||||
{
|
||||
return _factor;
|
||||
}
|
||||
|
||||
protected int getMaxQueueLength()
|
||||
{
|
||||
return _maxQueueLength;
|
||||
}
|
||||
|
||||
protected void decrementMemory(ByteBuffer buffer)
|
||||
{
|
||||
updateMemory(buffer, false);
|
||||
}
|
||||
|
||||
protected void incrementMemory(ByteBuffer buffer)
|
||||
{
|
||||
updateMemory(buffer, true);
|
||||
}
|
||||
|
||||
private void updateMemory(ByteBuffer buffer, boolean addOrSub)
|
||||
{
|
||||
AtomicLong memory = buffer.isDirect() ? _directMemory : _heapMemory;
|
||||
int capacity = buffer.capacity();
|
||||
memory.addAndGet(addOrSub ? capacity : -capacity);
|
||||
}
|
||||
|
||||
protected void releaseExcessMemory(boolean direct, Consumer<Boolean> clearFn)
|
||||
{
|
||||
long maxMemory = direct ? _maxDirectMemory : _maxHeapMemory;
|
||||
if (maxMemory > 0)
|
||||
{
|
||||
while (getMemory(direct) > maxMemory)
|
||||
clearFn.accept(direct);
|
||||
}
|
||||
}
|
||||
|
||||
@ManagedAttribute("The bytes retained by direct ByteBuffers")
|
||||
public long getDirectMemory()
|
||||
{
|
||||
return getMemory(true);
|
||||
}
|
||||
|
||||
@ManagedAttribute("The bytes retained by heap ByteBuffers")
|
||||
public long getHeapMemory()
|
||||
{
|
||||
return getMemory(false);
|
||||
}
|
||||
|
||||
public long getMemory(boolean direct)
|
||||
{
|
||||
AtomicLong memory = direct ? _directMemory : _heapMemory;
|
||||
return memory.get();
|
||||
}
|
||||
|
||||
@ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION")
|
||||
public void clear()
|
||||
{
|
||||
_heapMemory.set(0);
|
||||
_directMemory.set(0);
|
||||
}
|
||||
}
|
|
@ -19,9 +19,12 @@
|
|||
package org.eclipse.jetty.io;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Objects;
|
||||
import java.util.function.IntFunction;
|
||||
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.annotation.ManagedOperation;
|
||||
|
||||
/**
|
||||
* <p>A ByteBuffer pool where ByteBuffers are held in queues that are held in array elements.</p>
|
||||
|
@ -30,19 +33,18 @@ import org.eclipse.jetty.util.annotation.ManagedOperation;
|
|||
* 2048, and so on.</p>
|
||||
*/
|
||||
@ManagedObject
|
||||
public class ArrayByteBufferPool implements ByteBufferPool
|
||||
public class ArrayByteBufferPool extends AbstractByteBufferPool
|
||||
{
|
||||
private final int _minCapacity;
|
||||
private final ByteBufferPool.Bucket[] _direct;
|
||||
private final ByteBufferPool.Bucket[] _indirect;
|
||||
private final int _factor;
|
||||
|
||||
/**
|
||||
* Creates a new ArrayByteBufferPool with a default configuration.
|
||||
*/
|
||||
public ArrayByteBufferPool()
|
||||
{
|
||||
this(-1, -1, -1, -1);
|
||||
this(-1, -1, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -54,7 +56,7 @@ public class ArrayByteBufferPool implements ByteBufferPool
|
|||
*/
|
||||
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity)
|
||||
{
|
||||
this(minCapacity, factor, maxCapacity, -1);
|
||||
this(minCapacity, factor, maxCapacity, -1, -1, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -67,68 +69,153 @@ public class ArrayByteBufferPool implements ByteBufferPool
|
|||
*/
|
||||
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength)
|
||||
{
|
||||
this(minCapacity, factor, maxCapacity, maxQueueLength, -1, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new ArrayByteBufferPool with the given configuration.
|
||||
*
|
||||
* @param minCapacity the minimum ByteBuffer capacity
|
||||
* @param factor the capacity factor
|
||||
* @param maxCapacity the maximum ByteBuffer capacity
|
||||
* @param maxQueueLength the maximum ByteBuffer queue length
|
||||
* @param maxHeapMemory the max heap memory in bytes
|
||||
* @param maxDirectMemory the max direct memory in bytes
|
||||
*/
|
||||
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
|
||||
{
|
||||
super(factor, maxQueueLength, maxHeapMemory, maxDirectMemory);
|
||||
|
||||
factor = getCapacityFactor();
|
||||
if (minCapacity <= 0)
|
||||
minCapacity = 0;
|
||||
if (factor <= 0)
|
||||
factor = 1024;
|
||||
if (maxCapacity <= 0)
|
||||
maxCapacity = 64 * 1024;
|
||||
if ((maxCapacity % factor) != 0 || factor >= maxCapacity)
|
||||
throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity");
|
||||
_minCapacity = minCapacity;
|
||||
_factor = factor;
|
||||
|
||||
int length = maxCapacity / factor;
|
||||
_direct = new ByteBufferPool.Bucket[length];
|
||||
_indirect = new ByteBufferPool.Bucket[length];
|
||||
|
||||
int capacity = 0;
|
||||
for (int i = 0; i < _direct.length; ++i)
|
||||
{
|
||||
capacity += _factor;
|
||||
_direct[i] = new ByteBufferPool.Bucket(this, capacity, maxQueueLength);
|
||||
_indirect[i] = new ByteBufferPool.Bucket(this, capacity, maxQueueLength);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer acquire(int size, boolean direct)
|
||||
{
|
||||
ByteBufferPool.Bucket bucket = bucketFor(size, direct);
|
||||
int capacity = size < _minCapacity ? size : (bucketFor(size) + 1) * getCapacityFactor();
|
||||
ByteBufferPool.Bucket bucket = bucketFor(size, direct, null);
|
||||
if (bucket == null)
|
||||
return newByteBuffer(size, direct);
|
||||
return bucket.acquire(direct);
|
||||
return newByteBuffer(capacity, direct);
|
||||
ByteBuffer buffer = bucket.acquire();
|
||||
if (buffer == null)
|
||||
return newByteBuffer(capacity, direct);
|
||||
decrementMemory(buffer);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void release(ByteBuffer buffer)
|
||||
{
|
||||
if (buffer != null)
|
||||
if (buffer == null)
|
||||
return;
|
||||
boolean direct = buffer.isDirect();
|
||||
ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(), direct, this::newBucket);
|
||||
if (bucket != null)
|
||||
{
|
||||
ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(), buffer.isDirect());
|
||||
if (bucket != null)
|
||||
bucket.release(buffer);
|
||||
bucket.release(buffer);
|
||||
incrementMemory(buffer);
|
||||
releaseExcessMemory(direct, this::clearOldestBucket);
|
||||
}
|
||||
}
|
||||
|
||||
@ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION")
|
||||
private Bucket newBucket(int key)
|
||||
{
|
||||
return new Bucket(this, key * getCapacityFactor(), getMaxQueueLength());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void clear()
|
||||
{
|
||||
super.clear();
|
||||
for (int i = 0; i < _direct.length; ++i)
|
||||
{
|
||||
_direct[i].clear();
|
||||
_indirect[i].clear();
|
||||
Bucket bucket = _direct[i];
|
||||
if (bucket != null)
|
||||
bucket.clear();
|
||||
_direct[i] = null;
|
||||
bucket = _indirect[i];
|
||||
if (bucket != null)
|
||||
bucket.clear();
|
||||
_indirect[i] = null;
|
||||
}
|
||||
}
|
||||
|
||||
private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct)
|
||||
private void clearOldestBucket(boolean direct)
|
||||
{
|
||||
if (capacity <= _minCapacity)
|
||||
long oldest = Long.MAX_VALUE;
|
||||
int index = -1;
|
||||
Bucket[] buckets = bucketsFor(direct);
|
||||
for (int i = 0; i < buckets.length; ++i)
|
||||
{
|
||||
Bucket bucket = buckets[i];
|
||||
if (bucket == null)
|
||||
continue;
|
||||
long lastUpdate = bucket.getLastUpdate();
|
||||
if (lastUpdate < oldest)
|
||||
{
|
||||
oldest = lastUpdate;
|
||||
index = i;
|
||||
}
|
||||
}
|
||||
if (index >= 0)
|
||||
{
|
||||
Bucket bucket = buckets[index];
|
||||
buckets[index] = null;
|
||||
// The same bucket may be concurrently
|
||||
// removed, so we need this null guard.
|
||||
if (bucket != null)
|
||||
bucket.clear(this::decrementMemory);
|
||||
}
|
||||
}
|
||||
|
||||
private int bucketFor(int capacity)
|
||||
{
|
||||
return (capacity - 1) / getCapacityFactor();
|
||||
}
|
||||
|
||||
private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct, IntFunction<Bucket> newBucket)
|
||||
{
|
||||
if (capacity < _minCapacity)
|
||||
return null;
|
||||
int b = (capacity - 1) / _factor;
|
||||
int b = bucketFor(capacity);
|
||||
if (b >= _direct.length)
|
||||
return null;
|
||||
return bucketsFor(direct)[b];
|
||||
Bucket[] buckets = bucketsFor(direct);
|
||||
Bucket bucket = buckets[b];
|
||||
if (bucket == null && newBucket != null)
|
||||
buckets[b] = bucket = newBucket.apply(b + 1);
|
||||
return bucket;
|
||||
}
|
||||
|
||||
@ManagedAttribute("The number of pooled direct ByteBuffers")
|
||||
public long getDirectByteBufferCount()
|
||||
{
|
||||
return getByteBufferCount(true);
|
||||
}
|
||||
|
||||
@ManagedAttribute("The number of pooled heap ByteBuffers")
|
||||
public long getHeapByteBufferCount()
|
||||
{
|
||||
return getByteBufferCount(false);
|
||||
}
|
||||
|
||||
private long getByteBufferCount(boolean direct)
|
||||
{
|
||||
return Arrays.stream(bucketsFor(direct))
|
||||
.filter(Objects::nonNull)
|
||||
.mapToLong(Bucket::size)
|
||||
.sum();
|
||||
}
|
||||
|
||||
// Package local for testing
|
||||
|
|
|
@ -24,6 +24,7 @@ import java.util.Deque;
|
|||
import java.util.List;
|
||||
import java.util.concurrent.ConcurrentLinkedDeque;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
|
||||
|
@ -60,7 +61,7 @@ public interface ByteBufferPool
|
|||
* <p>Creates a new ByteBuffer of the given capacity and the given directness.</p>
|
||||
*
|
||||
* @param capacity the ByteBuffer capacity
|
||||
* @param direct the ByteBuffer directness
|
||||
* @param direct the ByteBuffer directness
|
||||
* @return a newly allocated ByteBuffer
|
||||
*/
|
||||
default ByteBuffer newByteBuffer(int capacity, boolean direct)
|
||||
|
@ -131,54 +132,80 @@ public interface ByteBufferPool
|
|||
}
|
||||
}
|
||||
|
||||
class Bucket
|
||||
public static class Bucket
|
||||
{
|
||||
private final Deque<ByteBuffer> _queue = new ConcurrentLinkedDeque<>();
|
||||
private final ByteBufferPool _pool;
|
||||
private final int _capacity;
|
||||
private final AtomicInteger _space;
|
||||
private final int _maxSize;
|
||||
private final AtomicInteger _size;
|
||||
private long _lastUpdate = System.nanoTime();
|
||||
|
||||
public Bucket(ByteBufferPool pool, int capacity, int maxSize)
|
||||
{
|
||||
_pool = pool;
|
||||
_capacity = capacity;
|
||||
_space = maxSize > 0 ? new AtomicInteger(maxSize) : null;
|
||||
_maxSize = maxSize;
|
||||
_size = maxSize > 0 ? new AtomicInteger() : null;
|
||||
}
|
||||
|
||||
public ByteBuffer acquire()
|
||||
{
|
||||
ByteBuffer buffer = queuePoll();
|
||||
if (buffer == null)
|
||||
return null;
|
||||
if (_size != null)
|
||||
_size.decrementAndGet();
|
||||
return buffer;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param direct whether to create a direct buffer when none is available
|
||||
* @return a ByteBuffer
|
||||
* @deprecated use {@link #acquire()} instead
|
||||
*/
|
||||
@Deprecated
|
||||
public ByteBuffer acquire(boolean direct)
|
||||
{
|
||||
ByteBuffer buffer = queuePoll();
|
||||
if (buffer == null)
|
||||
return _pool.newByteBuffer(_capacity, direct);
|
||||
if (_space != null)
|
||||
_space.incrementAndGet();
|
||||
if (_size != null)
|
||||
_size.decrementAndGet();
|
||||
return buffer;
|
||||
}
|
||||
|
||||
public void release(ByteBuffer buffer)
|
||||
{
|
||||
_lastUpdate = System.nanoTime();
|
||||
BufferUtil.clear(buffer);
|
||||
if (_space == null)
|
||||
if (_size == null)
|
||||
queueOffer(buffer);
|
||||
else if (_space.decrementAndGet() >= 0)
|
||||
else if (_size.incrementAndGet() <= _maxSize)
|
||||
queueOffer(buffer);
|
||||
else
|
||||
_space.incrementAndGet();
|
||||
_size.decrementAndGet();
|
||||
}
|
||||
|
||||
public void clear()
|
||||
{
|
||||
if (_space == null)
|
||||
clear(null);
|
||||
}
|
||||
|
||||
void clear(Consumer<ByteBuffer> memoryFn)
|
||||
{
|
||||
int size = _size == null ? 0 : _size.get() - 1;
|
||||
while (size >= 0)
|
||||
{
|
||||
queueClear();
|
||||
}
|
||||
else
|
||||
{
|
||||
int s = _space.getAndSet(0);
|
||||
while (s-- > 0)
|
||||
ByteBuffer buffer = queuePoll();
|
||||
if (buffer == null)
|
||||
break;
|
||||
if (memoryFn != null)
|
||||
memoryFn.accept(buffer);
|
||||
if (_size != null)
|
||||
{
|
||||
if (queuePoll() == null)
|
||||
_space.incrementAndGet();
|
||||
_size.decrementAndGet();
|
||||
--size;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -193,11 +220,6 @@ public interface ByteBufferPool
|
|||
return _queue.poll();
|
||||
}
|
||||
|
||||
private void queueClear()
|
||||
{
|
||||
_queue.clear();
|
||||
}
|
||||
|
||||
boolean isEmpty()
|
||||
{
|
||||
return _queue.isEmpty();
|
||||
|
@ -208,10 +230,15 @@ public interface ByteBufferPool
|
|||
return _queue.size();
|
||||
}
|
||||
|
||||
long getLastUpdate()
|
||||
{
|
||||
return _lastUpdate;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s@%x{%d/%d}", getClass().getSimpleName(), hashCode(), size(), _capacity);
|
||||
return String.format("%s@%x{%d/%d@%d}", getClass().getSimpleName(), hashCode(), size(), _maxSize, _capacity);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -19,13 +19,15 @@
|
|||
package org.eclipse.jetty.io;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.annotation.ManagedOperation;
|
||||
|
||||
/**
|
||||
* <p>A ByteBuffer pool where ByteBuffers are held in queues that are held in a Map.</p>
|
||||
|
@ -34,12 +36,11 @@ import org.eclipse.jetty.util.annotation.ManagedOperation;
|
|||
* queue of ByteBuffers each of capacity 2048, and so on.</p>
|
||||
*/
|
||||
@ManagedObject
|
||||
public class MappedByteBufferPool implements ByteBufferPool
|
||||
public class MappedByteBufferPool extends AbstractByteBufferPool
|
||||
{
|
||||
private final ConcurrentMap<Integer, Bucket> directBuffers = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<Integer, Bucket> heapBuffers = new ConcurrentHashMap<>();
|
||||
private final int _factor;
|
||||
private final int _maxQueueLength;
|
||||
private final ConcurrentMap<Integer, Bucket> _directBuffers = new ConcurrentHashMap<>();
|
||||
private final ConcurrentMap<Integer, Bucket> _heapBuffers = new ConcurrentHashMap<>();
|
||||
private final Function<Integer, Bucket> _newBucket;
|
||||
|
||||
/**
|
||||
* Creates a new MappedByteBufferPool with a default configuration.
|
||||
|
@ -60,31 +61,62 @@ public class MappedByteBufferPool implements ByteBufferPool
|
|||
}
|
||||
|
||||
/**
|
||||
* Creates a new MappedByteBufferPool with the given capacity factor and max queue length.
|
||||
* Creates a new MappedByteBufferPool with the given configuration.
|
||||
*
|
||||
* @param factor the capacity factor
|
||||
* @param maxQueueLength the maximum ByteBuffer queue length
|
||||
*/
|
||||
public MappedByteBufferPool(int factor, int maxQueueLength)
|
||||
{
|
||||
_factor = factor <= 0 ? 1024 : factor;
|
||||
_maxQueueLength = maxQueueLength;
|
||||
this(factor, maxQueueLength, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new MappedByteBufferPool with the given configuration.
|
||||
*
|
||||
* @param factor the capacity factor
|
||||
* @param maxQueueLength the maximum ByteBuffer queue length
|
||||
* @param newBucket the function that creates a Bucket
|
||||
*/
|
||||
public MappedByteBufferPool(int factor, int maxQueueLength, Function<Integer, Bucket> newBucket)
|
||||
{
|
||||
this(factor, maxQueueLength, newBucket, -1, -1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new MappedByteBufferPool with the given configuration.
|
||||
*
|
||||
* @param factor the capacity factor
|
||||
* @param maxQueueLength the maximum ByteBuffer queue length
|
||||
* @param newBucket the function that creates a Bucket
|
||||
* @param maxHeapMemory the max heap memory in bytes
|
||||
* @param maxDirectMemory the max direct memory in bytes
|
||||
*/
|
||||
public MappedByteBufferPool(int factor, int maxQueueLength, Function<Integer, Bucket> newBucket, long maxHeapMemory, long maxDirectMemory)
|
||||
{
|
||||
super(factor, maxQueueLength, maxHeapMemory, maxDirectMemory);
|
||||
_newBucket = newBucket != null ? newBucket : this::newBucket;
|
||||
}
|
||||
|
||||
private Bucket newBucket(int key)
|
||||
{
|
||||
return new Bucket(this, key * _factor, _maxQueueLength);
|
||||
return new Bucket(this, key * getCapacityFactor(), getMaxQueueLength());
|
||||
}
|
||||
|
||||
@Override
|
||||
public ByteBuffer acquire(int size, boolean direct)
|
||||
{
|
||||
int b = bucketFor(size);
|
||||
int capacity = b * getCapacityFactor();
|
||||
ConcurrentMap<Integer, Bucket> buffers = bucketsFor(direct);
|
||||
Bucket bucket = buffers.get(b);
|
||||
if (bucket == null)
|
||||
return newByteBuffer(b * _factor, direct);
|
||||
return bucket.acquire(direct);
|
||||
return newByteBuffer(capacity, direct);
|
||||
ByteBuffer buffer = bucket.acquire();
|
||||
if (buffer == null)
|
||||
return newByteBuffer(capacity, direct);
|
||||
decrementMemory(buffer);
|
||||
return buffer;
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -93,37 +125,86 @@ public class MappedByteBufferPool implements ByteBufferPool
|
|||
if (buffer == null)
|
||||
return; // nothing to do
|
||||
|
||||
// validate that this buffer is from this pool
|
||||
assert ((buffer.capacity() % _factor) == 0);
|
||||
int capacity = buffer.capacity();
|
||||
// Validate that this buffer is from this pool.
|
||||
assert ((capacity % getCapacityFactor()) == 0);
|
||||
|
||||
int b = bucketFor(buffer.capacity());
|
||||
ConcurrentMap<Integer, Bucket> buckets = bucketsFor(buffer.isDirect());
|
||||
|
||||
Bucket bucket = buckets.computeIfAbsent(b, this::newBucket);
|
||||
int b = bucketFor(capacity);
|
||||
boolean direct = buffer.isDirect();
|
||||
ConcurrentMap<Integer, Bucket> buckets = bucketsFor(direct);
|
||||
Bucket bucket = buckets.computeIfAbsent(b, _newBucket);
|
||||
bucket.release(buffer);
|
||||
incrementMemory(buffer);
|
||||
releaseExcessMemory(direct, this::clearOldestBucket);
|
||||
}
|
||||
|
||||
@ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION")
|
||||
@Override
|
||||
public void clear()
|
||||
{
|
||||
directBuffers.values().forEach(Bucket::clear);
|
||||
directBuffers.clear();
|
||||
heapBuffers.values().forEach(Bucket::clear);
|
||||
heapBuffers.clear();
|
||||
super.clear();
|
||||
_directBuffers.values().forEach(Bucket::clear);
|
||||
_directBuffers.clear();
|
||||
_heapBuffers.values().forEach(Bucket::clear);
|
||||
_heapBuffers.clear();
|
||||
}
|
||||
|
||||
private void clearOldestBucket(boolean direct)
|
||||
{
|
||||
long oldest = Long.MAX_VALUE;
|
||||
int index = -1;
|
||||
ConcurrentMap<Integer, Bucket> buckets = bucketsFor(direct);
|
||||
for (Map.Entry<Integer, Bucket> entry : buckets.entrySet())
|
||||
{
|
||||
Bucket bucket = entry.getValue();
|
||||
long lastUpdate = bucket.getLastUpdate();
|
||||
if (lastUpdate < oldest)
|
||||
{
|
||||
oldest = lastUpdate;
|
||||
index = entry.getKey();
|
||||
}
|
||||
}
|
||||
if (index >= 0)
|
||||
{
|
||||
Bucket bucket = buckets.remove(index);
|
||||
// The same bucket may be concurrently
|
||||
// removed, so we need this null guard.
|
||||
if (bucket != null)
|
||||
bucket.clear(this::decrementMemory);
|
||||
}
|
||||
}
|
||||
|
||||
private int bucketFor(int size)
|
||||
{
|
||||
int bucket = size / _factor;
|
||||
if (size % _factor > 0)
|
||||
int factor = getCapacityFactor();
|
||||
int bucket = size / factor;
|
||||
if (bucket * factor != size)
|
||||
++bucket;
|
||||
return bucket;
|
||||
}
|
||||
|
||||
@ManagedAttribute("The number of pooled direct ByteBuffers")
|
||||
public long getDirectByteBufferCount()
|
||||
{
|
||||
return getByteBufferCount(true);
|
||||
}
|
||||
|
||||
@ManagedAttribute("The number of pooled heap ByteBuffers")
|
||||
public long getHeapByteBufferCount()
|
||||
{
|
||||
return getByteBufferCount(false);
|
||||
}
|
||||
|
||||
private long getByteBufferCount(boolean direct)
|
||||
{
|
||||
return bucketsFor(direct).values().stream()
|
||||
.mapToLong(Bucket::size)
|
||||
.sum();
|
||||
}
|
||||
|
||||
// Package local for testing
|
||||
ConcurrentMap<Integer, Bucket> bucketsFor(boolean direct)
|
||||
{
|
||||
return direct ? directBuffers : heapBuffers;
|
||||
return direct ? _directBuffers : _heapBuffers;
|
||||
}
|
||||
|
||||
public static class Tagged extends MappedByteBufferPool
|
||||
|
|
|
@ -20,14 +20,18 @@ package org.eclipse.jetty.io;
|
|||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.Arrays;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.eclipse.jetty.io.ByteBufferPool.Bucket;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotSame;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertSame;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
|
@ -46,12 +50,18 @@ public class ArrayByteBufferPoolTest
|
|||
assertTrue(buffer.isDirect());
|
||||
assertEquals(size, buffer.capacity());
|
||||
for (ByteBufferPool.Bucket bucket : buckets)
|
||||
assertTrue(bucket.isEmpty());
|
||||
{
|
||||
if (bucket != null)
|
||||
assertTrue(bucket.isEmpty());
|
||||
}
|
||||
|
||||
bufferPool.release(buffer);
|
||||
|
||||
for (ByteBufferPool.Bucket bucket : buckets)
|
||||
assertTrue(bucket.isEmpty());
|
||||
{
|
||||
if (bucket != null)
|
||||
assertTrue(bucket.isEmpty());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -69,15 +79,17 @@ public class ArrayByteBufferPoolTest
|
|||
assertTrue(buffer.isDirect());
|
||||
assertThat(buffer.capacity(), greaterThanOrEqualTo(size));
|
||||
for (ByteBufferPool.Bucket bucket : buckets)
|
||||
assertTrue(bucket.isEmpty());
|
||||
{
|
||||
if (bucket != null)
|
||||
assertTrue(bucket.isEmpty());
|
||||
}
|
||||
|
||||
bufferPool.release(buffer);
|
||||
|
||||
int pooled = 0;
|
||||
for (ByteBufferPool.Bucket bucket : buckets)
|
||||
{
|
||||
pooled += bucket.size();
|
||||
}
|
||||
int pooled = Arrays.stream(buckets)
|
||||
.filter(Objects::nonNull)
|
||||
.mapToInt(Bucket::size)
|
||||
.sum();
|
||||
assertEquals(size <= 1000, 1 == pooled);
|
||||
}
|
||||
}
|
||||
|
@ -96,20 +108,17 @@ public class ArrayByteBufferPoolTest
|
|||
assertTrue(buffer.isDirect());
|
||||
assertThat(buffer.capacity(), greaterThanOrEqualTo(size));
|
||||
for (ByteBufferPool.Bucket bucket : buckets)
|
||||
assertTrue(bucket.isEmpty());
|
||||
{
|
||||
if (bucket != null)
|
||||
assertTrue(bucket.isEmpty());
|
||||
}
|
||||
|
||||
bufferPool.release(buffer);
|
||||
|
||||
int pooled = 0;
|
||||
for (ByteBufferPool.Bucket bucket : buckets)
|
||||
{
|
||||
if (!bucket.isEmpty())
|
||||
{
|
||||
pooled += bucket.size();
|
||||
// TODO assertThat(bucket._bufferSize,greaterThanOrEqualTo(size));
|
||||
// TODO assertThat(bucket._bufferSize,Matchers.lessThan(size+100));
|
||||
}
|
||||
}
|
||||
int pooled = Arrays.stream(buckets)
|
||||
.filter(Objects::nonNull)
|
||||
.mapToInt(Bucket::size)
|
||||
.sum();
|
||||
assertEquals(1, pooled);
|
||||
}
|
||||
}
|
||||
|
@ -130,16 +139,10 @@ public class ArrayByteBufferPoolTest
|
|||
ByteBuffer buffer3 = bufferPool.acquire(size, false);
|
||||
bufferPool.release(buffer3);
|
||||
|
||||
int pooled = 0;
|
||||
for (ByteBufferPool.Bucket bucket : buckets)
|
||||
{
|
||||
if (!bucket.isEmpty())
|
||||
{
|
||||
pooled += bucket.size();
|
||||
// TODO assertThat(bucket._bufferSize,greaterThanOrEqualTo(size));
|
||||
// TODO assertThat(bucket._bufferSize,Matchers.lessThan(size+100));
|
||||
}
|
||||
}
|
||||
int pooled = Arrays.stream(buckets)
|
||||
.filter(Objects::nonNull)
|
||||
.mapToInt(Bucket::size)
|
||||
.sum();
|
||||
assertEquals(1, pooled);
|
||||
|
||||
assertSame(buffer1, buffer2);
|
||||
|
@ -157,10 +160,16 @@ public class ArrayByteBufferPoolTest
|
|||
ByteBuffer buffer3 = bufferPool.acquire(512, false);
|
||||
|
||||
Bucket[] buckets = bufferPool.bucketsFor(false);
|
||||
Arrays.asList(buckets).forEach(b -> assertEquals(0, b.size()));
|
||||
Arrays.stream(buckets)
|
||||
.filter(Objects::nonNull)
|
||||
.forEach(b -> assertEquals(0, b.size()));
|
||||
|
||||
bufferPool.release(buffer1);
|
||||
Bucket bucket = Arrays.stream(buckets).filter(b -> b.size() > 0).findFirst().get();
|
||||
Bucket bucket = Arrays.stream(buckets)
|
||||
.filter(Objects::nonNull)
|
||||
.filter(b -> b.size() > 0)
|
||||
.findFirst()
|
||||
.orElseThrow(AssertionError::new);
|
||||
assertEquals(1, bucket.size());
|
||||
|
||||
bufferPool.release(buffer2);
|
||||
|
@ -169,4 +178,41 @@ public class ArrayByteBufferPoolTest
|
|||
bufferPool.release(buffer3);
|
||||
assertEquals(2, bucket.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxMemory()
|
||||
{
|
||||
int factor = 1024;
|
||||
int maxMemory = 11 * 1024;
|
||||
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(-1, factor, -1, -1, -1, maxMemory);
|
||||
Bucket[] buckets = bufferPool.bucketsFor(true);
|
||||
|
||||
// Create the buckets - the oldest is the larger.
|
||||
// 1+2+3+4=10 / maxMemory=11.
|
||||
for (int i = 4; i >= 1; --i)
|
||||
{
|
||||
int capacity = factor * i;
|
||||
ByteBuffer buffer = bufferPool.acquire(capacity, true);
|
||||
bufferPool.release(buffer);
|
||||
}
|
||||
|
||||
// Create and release a buffer to exceed the max memory.
|
||||
ByteBuffer buffer = bufferPool.newByteBuffer(2 * factor, true);
|
||||
bufferPool.release(buffer);
|
||||
|
||||
// Now the oldest buffer should be gone and we have: 1+2x2+3=8
|
||||
long memory = bufferPool.getMemory(true);
|
||||
assertThat(memory, lessThan((long)maxMemory));
|
||||
assertNull(buckets[3]);
|
||||
|
||||
// Create and release a large buffer.
|
||||
// Max memory is exceeded and buckets 3 and 1 are cleared.
|
||||
// We will have 2x2+7=11.
|
||||
buffer = bufferPool.newByteBuffer(7 * factor, true);
|
||||
bufferPool.release(buffer);
|
||||
memory = bufferPool.getMemory(true);
|
||||
assertThat(memory, lessThanOrEqualTo((long)maxMemory));
|
||||
assertNull(buckets[0]);
|
||||
assertNull(buckets[2]);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -29,7 +29,10 @@ import org.junit.jupiter.api.Test;
|
|||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.containsString;
|
||||
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.lessThan;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertSame;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.junit.jupiter.api.Assertions.fail;
|
||||
|
@ -156,4 +159,41 @@ public class MappedByteBufferPoolTest
|
|||
bufferPool.release(buffer3);
|
||||
assertEquals(2, bucket.size());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxMemory()
|
||||
{
|
||||
int factor = 1024;
|
||||
int maxMemory = 11 * 1024;
|
||||
MappedByteBufferPool bufferPool = new MappedByteBufferPool(factor, -1, null, -1, maxMemory);
|
||||
ConcurrentMap<Integer, Bucket> buckets = bufferPool.bucketsFor(true);
|
||||
|
||||
// Create the buckets - the oldest is the larger.
|
||||
// 1+2+3+4=10 / maxMemory=11.
|
||||
for (int i = 4; i >= 1; --i)
|
||||
{
|
||||
int capacity = factor * i;
|
||||
ByteBuffer buffer = bufferPool.acquire(capacity, true);
|
||||
bufferPool.release(buffer);
|
||||
}
|
||||
|
||||
// Create and release a buffer to exceed the max memory.
|
||||
ByteBuffer buffer = bufferPool.newByteBuffer(2 * factor, true);
|
||||
bufferPool.release(buffer);
|
||||
|
||||
// Now the oldest buffer should be gone and we have: 1+2x2+3=8
|
||||
long memory = bufferPool.getMemory(true);
|
||||
assertThat(memory, lessThan((long)maxMemory));
|
||||
assertNull(buckets.get(4));
|
||||
|
||||
// Create and release a large buffer.
|
||||
// Max memory is exceeded and buckets 3 and 1 are cleared.
|
||||
// We will have 2x2+7=11.
|
||||
buffer = bufferPool.newByteBuffer(7 * factor, true);
|
||||
bufferPool.release(buffer);
|
||||
memory = bufferPool.getMemory(true);
|
||||
assertThat(memory, lessThanOrEqualTo((long)maxMemory));
|
||||
assertNull(buckets.get(1));
|
||||
assertNull(buckets.get(3));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,12 @@
|
|||
<?xml version="1.0"?>
|
||||
<!DOCTYPE Configure PUBLIC "-//Jetty//Configure//EN" "http://www.eclipse.org/jetty/configure_9_3.dtd">
|
||||
<Configure>
|
||||
<New id="byteBufferPool" class="org.eclipse.jetty.io.ArrayByteBufferPool">
|
||||
<Arg type="int"><Property name="jetty.byteBufferPool.minCapacity" default="0"/></Arg>
|
||||
<Arg type="int"><Property name="jetty.byteBufferPool.factor" default="1024"/></Arg>
|
||||
<Arg type="int"><Property name="jetty.byteBufferPool.maxCapacity" default="65536"/></Arg>
|
||||
<Arg type="int"><Property name="jetty.byteBufferPool.maxQueueLength" default="-1"/></Arg>
|
||||
<Arg type="long"><Property name="jetty.byteBufferPool.maxHeapMemory" default="-1"/></Arg>
|
||||
<Arg type="long"><Property name="jetty.byteBufferPool.maxDirectMemory" default="-1"/></Arg>
|
||||
</New>
|
||||
</Configure>
|
|
@ -26,6 +26,10 @@
|
|||
<Configure id="Server" class="org.eclipse.jetty.server.Server">
|
||||
<Arg name="threadpool"><Ref refid="threadPool"/></Arg>
|
||||
|
||||
<Call name="addBean">
|
||||
<Arg><Ref refid="byteBufferPool"/></Arg>
|
||||
</Call>
|
||||
|
||||
<!-- =========================================================== -->
|
||||
<!-- Add shared Scheduler instance -->
|
||||
<!-- =========================================================== -->
|
||||
|
|
|
@ -0,0 +1,27 @@
|
|||
DO NOT EDIT - See: https://www.eclipse.org/jetty/documentation/current/startup-modules.html
|
||||
|
||||
[description]
|
||||
Configures the ByteBufferPool used by ServerConnectors.
|
||||
|
||||
[xml]
|
||||
etc/jetty-bytebufferpool.xml
|
||||
|
||||
[ini-template]
|
||||
### Server ByteBufferPool Configuration
|
||||
## Minimum capacity to pool ByteBuffers
|
||||
#jetty.byteBufferPool.minCapacity=0
|
||||
|
||||
## Maximum capacity to pool ByteBuffers
|
||||
#jetty.byteBufferPool.maxCapacity=65536
|
||||
|
||||
## Capacity factor
|
||||
#jetty.byteBufferPool.factor=1024
|
||||
|
||||
## Maximum queue length for each bucket (-1 for unbounded)
|
||||
#jetty.byteBufferPool.maxQueueLength=-1
|
||||
|
||||
## Maximum heap memory retainable by the pool (-1 for unlimited)
|
||||
#jetty.byteBufferPool.maxHeapMemory=-1
|
||||
|
||||
## Maximum direct memory retainable by the pool (-1 for unlimited)
|
||||
#jetty.byteBufferPool.maxDirectMemory=-1
|
|
@ -11,6 +11,7 @@ logging
|
|||
|
||||
[depend]
|
||||
threadpool
|
||||
bytebufferpool
|
||||
|
||||
[lib]
|
||||
lib/servlet-api-3.1.jar
|
||||
|
|
Loading…
Reference in New Issue