Issue #1861 - Limit total bytes pooled by ByteBufferPools.
Fixed the implementation to correctly track the memory retained. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
7323b19f6f
commit
ced0361cab
|
@ -64,13 +64,11 @@ abstract class AbstractByteBufferPool implements ByteBufferPool
|
|||
int capacity = buffer.capacity();
|
||||
long maxMemory = direct ? _maxDirectMemory : _maxHeapMemory;
|
||||
AtomicLong memory = direct ? _directMemory : _heapMemory;
|
||||
if (maxMemory <= 0)
|
||||
return true;
|
||||
while (true)
|
||||
{
|
||||
long current = memory.get();
|
||||
long value = current + capacity;
|
||||
if (value > maxMemory)
|
||||
if (maxMemory > 0 && value > maxMemory)
|
||||
return false;
|
||||
if (memory.compareAndSet(current, value))
|
||||
return true;
|
||||
|
|
|
@ -100,13 +100,13 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool
|
|||
@Override
|
||||
public ByteBuffer acquire(int size, boolean direct)
|
||||
{
|
||||
int capacity = size < _minCapacity ? size : (bucketFor(size) + 1) * getCapacityFactor();
|
||||
ByteBufferPool.Bucket bucket = bucketFor(size, direct, null);
|
||||
if (bucket == null)
|
||||
{
|
||||
int capacity = size < _minCapacity ? size : (bucketFor(size) + 1) * getCapacityFactor();
|
||||
return newByteBuffer(capacity, direct);
|
||||
}
|
||||
ByteBuffer buffer = bucket.acquire(direct);
|
||||
ByteBuffer buffer = bucket.acquire();
|
||||
if (buffer == null)
|
||||
return newByteBuffer(capacity, direct);
|
||||
decrementMemory(buffer);
|
||||
return buffer;
|
||||
}
|
||||
|
|
|
@ -145,6 +145,22 @@ public interface ByteBufferPool
|
|||
_space = maxSize > 0 ? new AtomicInteger(maxSize) : null;
|
||||
}
|
||||
|
||||
public ByteBuffer acquire()
|
||||
{
|
||||
ByteBuffer buffer = queuePoll();
|
||||
if (buffer == null)
|
||||
return null;
|
||||
if (_space != null)
|
||||
_space.incrementAndGet();
|
||||
return buffer;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param direct whether to create a direct buffer when none is available
|
||||
* @return a ByteBuffer
|
||||
* @deprecated use {@link #acquire()} instead
|
||||
*/
|
||||
@Deprecated
|
||||
public ByteBuffer acquire(boolean direct)
|
||||
{
|
||||
ByteBuffer buffer = queuePoll();
|
||||
|
|
|
@ -105,11 +105,14 @@ public class MappedByteBufferPool extends AbstractByteBufferPool
|
|||
public ByteBuffer acquire(int size, boolean direct)
|
||||
{
|
||||
int b = bucketFor(size);
|
||||
int capacity = b * getCapacityFactor();
|
||||
ConcurrentMap<Integer, Bucket> buffers = bucketsFor(direct);
|
||||
Bucket bucket = buffers.get(b);
|
||||
if (bucket == null)
|
||||
return newByteBuffer(b * getCapacityFactor(), direct);
|
||||
ByteBuffer buffer = bucket.acquire(direct);
|
||||
return newByteBuffer(capacity, direct);
|
||||
ByteBuffer buffer = bucket.acquire();
|
||||
if (buffer == null)
|
||||
return newByteBuffer(capacity, direct);
|
||||
decrementMemory(buffer);
|
||||
return buffer;
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue