Issue #1861 - Limit total bytes pooled by ByteBufferPools.

Updated the implementation to track the oldest bucket and
release its buffers when the retained memory is exceeded.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2019-02-27 12:54:06 +01:00
parent ced0361cab
commit 16933f8df2
6 changed files with 175 additions and 93 deletions

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Consumer;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation; import org.eclipse.jetty.util.annotation.ManagedOperation;
@ -54,24 +55,28 @@ abstract class AbstractByteBufferPool implements ByteBufferPool
protected void decrementMemory(ByteBuffer buffer) protected void decrementMemory(ByteBuffer buffer)
{ {
AtomicLong memory = buffer.isDirect() ? _directMemory : _heapMemory; updateMemory(buffer, false);
memory.addAndGet(-buffer.capacity());
} }
protected boolean incrementMemory(ByteBuffer buffer) protected void incrementMemory(ByteBuffer buffer)
{ {
boolean direct = buffer.isDirect(); updateMemory(buffer, true);
}
private void updateMemory(ByteBuffer buffer, boolean addOrSub)
{
AtomicLong memory = buffer.isDirect() ? _directMemory : _heapMemory;
int capacity = buffer.capacity(); int capacity = buffer.capacity();
long maxMemory = direct ? _maxDirectMemory : _maxHeapMemory; memory.addAndGet(addOrSub ? capacity : -capacity);
AtomicLong memory = direct ? _directMemory : _heapMemory; }
while (true)
protected void releaseExcessMemory(boolean direct, Consumer<Boolean> clearFn)
{ {
long current = memory.get(); long maxMemory = direct ? _maxDirectMemory : _maxHeapMemory;
long value = current + capacity; if (maxMemory > 0)
if (maxMemory > 0 && value > maxMemory) {
return false; while (getMemory(direct) > maxMemory)
if (memory.compareAndSet(current, value)) clearFn.accept(direct);
return true;
} }
} }

View File

@ -116,9 +116,14 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool
{ {
if (buffer == null) if (buffer == null)
return; return;
ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(), buffer.isDirect(), this::newBucket); boolean direct = buffer.isDirect();
if (bucket != null && incrementMemory(buffer)) ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(), direct, this::newBucket);
if (bucket != null)
{
bucket.release(buffer); bucket.release(buffer);
incrementMemory(buffer);
releaseExcessMemory(direct, this::clearOldestBucket);
}
} }
private Bucket newBucket(int key) private Bucket newBucket(int key)
@ -143,6 +148,35 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool
} }
} }
private void clearOldestBucket(boolean direct)
{
long oldest = 0;
int index = -1;
Bucket[] buckets = bucketsFor(direct);
long now = System.nanoTime();
for (int i = 0; i < buckets.length; ++i)
{
Bucket bucket = buckets[i];
if (bucket == null)
continue;
long age = now - bucket.getLastUpdate();
if (age > oldest)
{
oldest = age;
index = i;
}
}
if (index >= 0)
{
Bucket bucket = buckets[index];
buckets[index] = null;
// The same bucket may be concurrently
// removed, so we need this null guard.
if (bucket != null)
bucket.clear(this::decrementMemory);
}
}
private int bucketFor(int capacity) private int bucketFor(int capacity)
{ {
return (capacity - 1) / getCapacityFactor(); return (capacity - 1) / getCapacityFactor();

View File

@ -24,6 +24,7 @@ import java.util.Deque;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
@ -136,13 +137,16 @@ public interface ByteBufferPool
private final Deque<ByteBuffer> _queue = new ConcurrentLinkedDeque<>(); private final Deque<ByteBuffer> _queue = new ConcurrentLinkedDeque<>();
private final ByteBufferPool _pool; private final ByteBufferPool _pool;
private final int _capacity; private final int _capacity;
private final AtomicInteger _space; private final int _maxSize;
private final AtomicInteger _size;
private long _lastUpdate;
public Bucket(ByteBufferPool pool, int capacity, int maxSize) public Bucket(ByteBufferPool pool, int capacity, int maxSize)
{ {
_pool = pool; _pool = pool;
_capacity = capacity; _capacity = capacity;
_space = maxSize > 0 ? new AtomicInteger(maxSize) : null; _maxSize = maxSize;
_size = maxSize > 0 ? new AtomicInteger() : null;
} }
public ByteBuffer acquire() public ByteBuffer acquire()
@ -150,8 +154,8 @@ public interface ByteBufferPool
ByteBuffer buffer = queuePoll(); ByteBuffer buffer = queuePoll();
if (buffer == null) if (buffer == null)
return null; return null;
if (_space != null) if (_size != null)
_space.incrementAndGet(); _size.decrementAndGet();
return buffer; return buffer;
} }
@ -166,35 +170,42 @@ public interface ByteBufferPool
ByteBuffer buffer = queuePoll(); ByteBuffer buffer = queuePoll();
if (buffer == null) if (buffer == null)
return _pool.newByteBuffer(_capacity, direct); return _pool.newByteBuffer(_capacity, direct);
if (_space != null) if (_size != null)
_space.incrementAndGet(); _size.decrementAndGet();
return buffer; return buffer;
} }
public void release(ByteBuffer buffer) public void release(ByteBuffer buffer)
{ {
_lastUpdate = System.nanoTime();
BufferUtil.clear(buffer); BufferUtil.clear(buffer);
if (_space == null) if (_size == null)
queueOffer(buffer); queueOffer(buffer);
else if (_space.decrementAndGet() >= 0) else if (_size.incrementAndGet() <= _maxSize)
queueOffer(buffer); queueOffer(buffer);
else else
_space.incrementAndGet(); _size.decrementAndGet();
} }
public void clear() public void clear()
{ {
if (_space == null) clear(null);
{
queueClear();
} }
else
void clear(Consumer<ByteBuffer> memoryFn)
{ {
int s = _space.getAndSet(0); int size = _size == null ? 0 : _size.get() - 1;
while (s-- > 0) while (size >= 0)
{ {
if (queuePoll() == null) ByteBuffer buffer = queuePoll();
_space.incrementAndGet(); if (buffer == null)
break;
if (memoryFn != null)
memoryFn.accept(buffer);
if (_size != null)
{
_size.decrementAndGet();
--size;
} }
} }
} }
@ -209,11 +220,6 @@ public interface ByteBufferPool
return _queue.poll(); return _queue.poll();
} }
private void queueClear()
{
_queue.clear();
}
boolean isEmpty() boolean isEmpty()
{ {
return _queue.isEmpty(); return _queue.isEmpty();
@ -224,10 +230,15 @@ public interface ByteBufferPool
return _queue.size(); return _queue.size();
} }
long getLastUpdate()
{
return _lastUpdate;
}
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s@%x{%d/%d}", getClass().getSimpleName(), hashCode(), size(), _capacity); return String.format("%s@%x{%d/%d@%d}", getClass().getSimpleName(), hashCode(), size(), _maxSize, _capacity);
} }
} }
} }

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -128,11 +129,12 @@ public class MappedByteBufferPool extends AbstractByteBufferPool
assert ((capacity % getCapacityFactor()) == 0); assert ((capacity % getCapacityFactor()) == 0);
int b = bucketFor(capacity); int b = bucketFor(capacity);
ConcurrentMap<Integer, Bucket> buckets = bucketsFor(buffer.isDirect()); boolean direct = buffer.isDirect();
ConcurrentMap<Integer, Bucket> buckets = bucketsFor(direct);
Bucket bucket = buckets.computeIfAbsent(b, _newBucket); Bucket bucket = buckets.computeIfAbsent(b, _newBucket);
if (incrementMemory(buffer))
bucket.release(buffer); bucket.release(buffer);
incrementMemory(buffer);
releaseExcessMemory(direct, this::clearOldestBucket);
} }
@Override @Override
@ -145,6 +147,32 @@ public class MappedByteBufferPool extends AbstractByteBufferPool
_heapBuffers.clear(); _heapBuffers.clear();
} }
private void clearOldestBucket(boolean direct)
{
long oldest = 0;
int index = -1;
ConcurrentMap<Integer, Bucket> buckets = bucketsFor(direct);
long now = System.nanoTime();
for (Map.Entry<Integer, Bucket> entry : buckets.entrySet())
{
Bucket bucket = entry.getValue();
long age = now - bucket.getLastUpdate();
if (age > oldest)
{
oldest = age;
index = entry.getKey();
}
}
if (index >= 0)
{
Bucket bucket = buckets.remove(index);
// The same bucket may be concurrently
// removed, so we need this null guard.
if (bucket != null)
bucket.clear(this::decrementMemory);
}
}
private int bucketFor(int size) private int bucketFor(int size)
{ {
int factor = getCapacityFactor(); int factor = getCapacityFactor();

View File

@ -20,9 +20,7 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.Objects; import java.util.Objects;
import java.util.stream.Collectors;
import org.eclipse.jetty.io.ByteBufferPool.Bucket; import org.eclipse.jetty.io.ByteBufferPool.Bucket;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
@ -33,6 +31,7 @@ import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -184,36 +183,36 @@ public class ArrayByteBufferPoolTest
public void testMaxMemory() public void testMaxMemory()
{ {
int factor = 1024; int factor = 1024;
int maxMemory = 10 * 1024; int maxMemory = 11 * 1024;
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(-1, factor, -1, -1, -1, maxMemory); ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(-1, factor, -1, -1, -1, maxMemory);
Bucket[] buckets = bufferPool.bucketsFor(true);
int capacity = 3 * 1024; // Create the buckets - the oldest is the larger.
ByteBuffer[] buffers = new ByteBuffer[maxMemory / capacity + 1]; // 1+2+3+4=10 / maxMemory=11.
for (int i = 0; i < buffers.length; ++i) for (int i = 4; i >= 1; --i)
buffers[i] = bufferPool.acquire(capacity, true); {
int capacity = factor * i;
// Return all the buffers, but only some is retained by the pool.
for (ByteBuffer buffer : buffers)
bufferPool.release(buffer);
List<Bucket> directBuckets = Arrays.stream(bufferPool.bucketsFor(true))
.filter(Objects::nonNull)
.filter(b -> !b.isEmpty())
.collect(Collectors.toList());
assertEquals(1, directBuckets.size());
Bucket bucket = directBuckets.get(0);
assertEquals(buffers.length - 1, bucket.size());
long memory1 = bufferPool.getMemory(true);
assertThat(memory1, lessThanOrEqualTo((long)maxMemory));
ByteBuffer buffer = bufferPool.acquire(capacity, true); ByteBuffer buffer = bufferPool.acquire(capacity, true);
long memory2 = bufferPool.getMemory(true);
assertThat(memory2, lessThan(memory1));
bufferPool.release(buffer); bufferPool.release(buffer);
long memory3 = bufferPool.getMemory(true); }
assertEquals(memory1, memory3);
// Create and release a buffer to exceed the max memory.
ByteBuffer buffer = bufferPool.newByteBuffer(2 * factor, true);
bufferPool.release(buffer);
// Now the oldest buffer should be gone and we have: 1+2x2+3=8
long memory = bufferPool.getMemory(true);
assertThat(memory, lessThan((long)maxMemory));
assertNull(buckets[3]);
// Create and release a large buffer.
// Max memory is exceeded and buckets 3 and 1 are cleared.
// We will have 2x2+7=11.
buffer = bufferPool.newByteBuffer(7 * factor, true);
bufferPool.release(buffer);
memory = bufferPool.getMemory(true);
assertThat(memory, lessThanOrEqualTo((long)maxMemory));
assertNull(buckets[0]);
assertNull(buckets[2]);
} }
} }

View File

@ -32,6 +32,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail; import static org.junit.jupiter.api.Assertions.fail;
@ -163,32 +164,36 @@ public class MappedByteBufferPoolTest
public void testMaxMemory() public void testMaxMemory()
{ {
int factor = 1024; int factor = 1024;
int maxMemory = 10 * 1024; int maxMemory = 11 * 1024;
MappedByteBufferPool bufferPool = new MappedByteBufferPool(factor, -1, null, -1, maxMemory); MappedByteBufferPool bufferPool = new MappedByteBufferPool(factor, -1, null, -1, maxMemory);
ConcurrentMap<Integer, Bucket> buckets = bufferPool.bucketsFor(true);
int capacity = 3 * 1024; // Create the buckets - the oldest is the larger.
ByteBuffer[] buffers = new ByteBuffer[maxMemory / capacity + 1]; // 1+2+3+4=10 / maxMemory=11.
for (int i = 0; i < buffers.length; ++i) for (int i = 4; i >= 1; --i)
buffers[i] = bufferPool.acquire(capacity, true); {
int capacity = factor * i;
// Return all the buffers, but only some is retained by the pool.
for (ByteBuffer buffer : buffers)
bufferPool.release(buffer);
ConcurrentMap<Integer, Bucket> directMap = bufferPool.bucketsFor(true);
assertEquals(1, directMap.size());
Bucket bucket = directMap.values().iterator().next();
assertEquals(buffers.length - 1, bucket.size());
long memory1 = bufferPool.getMemory(true);
assertThat(memory1, lessThanOrEqualTo((long)maxMemory));
ByteBuffer buffer = bufferPool.acquire(capacity, true); ByteBuffer buffer = bufferPool.acquire(capacity, true);
long memory2 = bufferPool.getMemory(true);
assertThat(memory2, lessThan(memory1));
bufferPool.release(buffer); bufferPool.release(buffer);
long memory3 = bufferPool.getMemory(true); }
assertEquals(memory1, memory3);
// Create and release a buffer to exceed the max memory.
ByteBuffer buffer = bufferPool.newByteBuffer(2 * factor, true);
bufferPool.release(buffer);
// Now the oldest buffer should be gone and we have: 1+2x2+3=8
long memory = bufferPool.getMemory(true);
assertThat(memory, lessThan((long)maxMemory));
assertNull(buckets.get(4));
// Create and release a large buffer.
// Max memory is exceeded and buckets 3 and 1 are cleared.
// We will have 2x2+7=11.
buffer = bufferPool.newByteBuffer(7 * factor, true);
bufferPool.release(buffer);
memory = bufferPool.getMemory(true);
assertThat(memory, lessThanOrEqualTo((long)maxMemory));
assertNull(buckets.get(1));
assertNull(buckets.get(3));
} }
} }