From 16933f8df27d5f0b37632de794e685aa5b73cb77 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Wed, 27 Feb 2019 12:54:06 +0100 Subject: [PATCH] Issue #1861 - Limit total bytes pooled by ByteBufferPools. Updated the implementation to track the oldest bucket and release its buffers when the retained memory is exceeded. Signed-off-by: Simone Bordet --- .../jetty/io/AbstractByteBufferPool.java | 29 +++++---- .../eclipse/jetty/io/ArrayByteBufferPool.java | 38 +++++++++++- .../org/eclipse/jetty/io/ByteBufferPool.java | 61 +++++++++++-------- .../jetty/io/MappedByteBufferPool.java | 36 +++++++++-- .../jetty/io/ArrayByteBufferPoolTest.java | 55 ++++++++--------- .../jetty/io/MappedByteBufferPoolTest.java | 49 ++++++++------- 6 files changed, 175 insertions(+), 93 deletions(-) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java index c3f31a7f835..8a89c1dbd28 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.io; import java.nio.ByteBuffer; import java.util.concurrent.atomic.AtomicLong; +import java.util.function.Consumer; import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedOperation; @@ -54,24 +55,28 @@ abstract class AbstractByteBufferPool implements ByteBufferPool protected void decrementMemory(ByteBuffer buffer) { - AtomicLong memory = buffer.isDirect() ? _directMemory : _heapMemory; - memory.addAndGet(-buffer.capacity()); + updateMemory(buffer, false); } - protected boolean incrementMemory(ByteBuffer buffer) + protected void incrementMemory(ByteBuffer buffer) { - boolean direct = buffer.isDirect(); + updateMemory(buffer, true); + } + + private void updateMemory(ByteBuffer buffer, boolean addOrSub) + { + AtomicLong memory = buffer.isDirect() ? _directMemory : _heapMemory; int capacity = buffer.capacity(); + memory.addAndGet(addOrSub ? capacity : -capacity); + } + + protected void releaseExcessMemory(boolean direct, Consumer clearFn) + { long maxMemory = direct ? _maxDirectMemory : _maxHeapMemory; - AtomicLong memory = direct ? _directMemory : _heapMemory; - while (true) + if (maxMemory > 0) { - long current = memory.get(); - long value = current + capacity; - if (maxMemory > 0 && value > maxMemory) - return false; - if (memory.compareAndSet(current, value)) - return true; + while (getMemory(direct) > maxMemory) + clearFn.accept(direct); } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java index 76abb023674..971f8bb5332 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java @@ -116,9 +116,14 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool { if (buffer == null) return; - ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(), buffer.isDirect(), this::newBucket); - if (bucket != null && incrementMemory(buffer)) + boolean direct = buffer.isDirect(); + ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(), direct, this::newBucket); + if (bucket != null) + { bucket.release(buffer); + incrementMemory(buffer); + releaseExcessMemory(direct, this::clearOldestBucket); + } } private Bucket newBucket(int key) @@ -143,6 +148,35 @@ public class ArrayByteBufferPool extends AbstractByteBufferPool } } + private void clearOldestBucket(boolean direct) + { + long oldest = 0; + int index = -1; + Bucket[] buckets = bucketsFor(direct); + long now = System.nanoTime(); + for (int i = 0; i < buckets.length; ++i) + { + Bucket bucket = buckets[i]; + if (bucket == null) + continue; + long age = now - bucket.getLastUpdate(); + if (age > oldest) + { + oldest = age; + index = i; + } + } + if (index >= 0) + { + Bucket bucket = buckets[index]; + buckets[index] = null; + // The same bucket may be concurrently + // removed, so we need this null guard. + if (bucket != null) + bucket.clear(this::decrementMemory); + } + } + private int bucketFor(int capacity) { return (capacity - 1) / getCapacityFactor(); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java index 352a1f18117..4d8a6837a04 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java @@ -24,6 +24,7 @@ import java.util.Deque; import java.util.List; import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import org.eclipse.jetty.util.BufferUtil; @@ -60,7 +61,7 @@ public interface ByteBufferPool *

Creates a new ByteBuffer of the given capacity and the given directness.

* * @param capacity the ByteBuffer capacity - * @param direct the ByteBuffer directness + * @param direct the ByteBuffer directness * @return a newly allocated ByteBuffer */ default ByteBuffer newByteBuffer(int capacity, boolean direct) @@ -136,13 +137,16 @@ public interface ByteBufferPool private final Deque _queue = new ConcurrentLinkedDeque<>(); private final ByteBufferPool _pool; private final int _capacity; - private final AtomicInteger _space; + private final int _maxSize; + private final AtomicInteger _size; + private long _lastUpdate; public Bucket(ByteBufferPool pool, int capacity, int maxSize) { _pool = pool; _capacity = capacity; - _space = maxSize > 0 ? new AtomicInteger(maxSize) : null; + _maxSize = maxSize; + _size = maxSize > 0 ? new AtomicInteger() : null; } public ByteBuffer acquire() @@ -150,8 +154,8 @@ public interface ByteBufferPool ByteBuffer buffer = queuePoll(); if (buffer == null) return null; - if (_space != null) - _space.incrementAndGet(); + if (_size != null) + _size.decrementAndGet(); return buffer; } @@ -166,35 +170,42 @@ public interface ByteBufferPool ByteBuffer buffer = queuePoll(); if (buffer == null) return _pool.newByteBuffer(_capacity, direct); - if (_space != null) - _space.incrementAndGet(); + if (_size != null) + _size.decrementAndGet(); return buffer; } public void release(ByteBuffer buffer) { + _lastUpdate = System.nanoTime(); BufferUtil.clear(buffer); - if (_space == null) + if (_size == null) queueOffer(buffer); - else if (_space.decrementAndGet() >= 0) + else if (_size.incrementAndGet() <= _maxSize) queueOffer(buffer); else - _space.incrementAndGet(); + _size.decrementAndGet(); } public void clear() { - if (_space == null) + clear(null); + } + + void clear(Consumer memoryFn) + { + int size = _size == null ? 0 : _size.get() - 1; + while (size >= 0) { - queueClear(); - } - else - { - int s = _space.getAndSet(0); - while (s-- > 0) + ByteBuffer buffer = queuePoll(); + if (buffer == null) + break; + if (memoryFn != null) + memoryFn.accept(buffer); + if (_size != null) { - if (queuePoll() == null) - _space.incrementAndGet(); + _size.decrementAndGet(); + --size; } } } @@ -209,11 +220,6 @@ public interface ByteBufferPool return _queue.poll(); } - private void queueClear() - { - _queue.clear(); - } - boolean isEmpty() { return _queue.isEmpty(); @@ -224,10 +230,15 @@ public interface ByteBufferPool return _queue.size(); } + long getLastUpdate() + { + return _lastUpdate; + } + @Override public String toString() { - return String.format("%s@%x{%d/%d}", getClass().getSimpleName(), hashCode(), size(), _capacity); + return String.format("%s@%x{%d/%d@%d}", getClass().getSimpleName(), hashCode(), size(), _maxSize, _capacity); } } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java index e4448ab84ae..98772e1a765 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.io; import java.nio.ByteBuffer; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; @@ -128,11 +129,12 @@ public class MappedByteBufferPool extends AbstractByteBufferPool assert ((capacity % getCapacityFactor()) == 0); int b = bucketFor(capacity); - ConcurrentMap buckets = bucketsFor(buffer.isDirect()); + boolean direct = buffer.isDirect(); + ConcurrentMap buckets = bucketsFor(direct); Bucket bucket = buckets.computeIfAbsent(b, _newBucket); - - if (incrementMemory(buffer)) - bucket.release(buffer); + bucket.release(buffer); + incrementMemory(buffer); + releaseExcessMemory(direct, this::clearOldestBucket); } @Override @@ -145,6 +147,32 @@ public class MappedByteBufferPool extends AbstractByteBufferPool _heapBuffers.clear(); } + private void clearOldestBucket(boolean direct) + { + long oldest = 0; + int index = -1; + ConcurrentMap buckets = bucketsFor(direct); + long now = System.nanoTime(); + for (Map.Entry entry : buckets.entrySet()) + { + Bucket bucket = entry.getValue(); + long age = now - bucket.getLastUpdate(); + if (age > oldest) + { + oldest = age; + index = entry.getKey(); + } + } + if (index >= 0) + { + Bucket bucket = buckets.remove(index); + // The same bucket may be concurrently + // removed, so we need this null guard. + if (bucket != null) + bucket.clear(this::decrementMemory); + } + } + private int bucketFor(int size) { int factor = getCapacityFactor(); diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java index 99b2fb00fa9..eabdfe6f7bc 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java @@ -20,9 +20,7 @@ package org.eclipse.jetty.io; import java.nio.ByteBuffer; import java.util.Arrays; -import java.util.List; import java.util.Objects; -import java.util.stream.Collectors; import org.eclipse.jetty.io.ByteBufferPool.Bucket; import org.junit.jupiter.api.Test; @@ -33,6 +31,7 @@ import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotSame; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -184,36 +183,36 @@ public class ArrayByteBufferPoolTest public void testMaxMemory() { int factor = 1024; - int maxMemory = 10 * 1024; + int maxMemory = 11 * 1024; ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(-1, factor, -1, -1, -1, maxMemory); + Bucket[] buckets = bufferPool.bucketsFor(true); - int capacity = 3 * 1024; - ByteBuffer[] buffers = new ByteBuffer[maxMemory / capacity + 1]; - for (int i = 0; i < buffers.length; ++i) - buffers[i] = bufferPool.acquire(capacity, true); - - // Return all the buffers, but only some is retained by the pool. - for (ByteBuffer buffer : buffers) + // Create the buckets - the oldest is the larger. + // 1+2+3+4=10 / maxMemory=11. + for (int i = 4; i >= 1; --i) + { + int capacity = factor * i; + ByteBuffer buffer = bufferPool.acquire(capacity, true); bufferPool.release(buffer); + } - List directBuckets = Arrays.stream(bufferPool.bucketsFor(true)) - .filter(Objects::nonNull) - .filter(b -> !b.isEmpty()) - .collect(Collectors.toList()); - assertEquals(1, directBuckets.size()); - - Bucket bucket = directBuckets.get(0); - assertEquals(buffers.length - 1, bucket.size()); - - long memory1 = bufferPool.getMemory(true); - assertThat(memory1, lessThanOrEqualTo((long)maxMemory)); - - ByteBuffer buffer = bufferPool.acquire(capacity, true); - long memory2 = bufferPool.getMemory(true); - assertThat(memory2, lessThan(memory1)); - + // Create and release a buffer to exceed the max memory. + ByteBuffer buffer = bufferPool.newByteBuffer(2 * factor, true); bufferPool.release(buffer); - long memory3 = bufferPool.getMemory(true); - assertEquals(memory1, memory3); + + // Now the oldest buffer should be gone and we have: 1+2x2+3=8 + long memory = bufferPool.getMemory(true); + assertThat(memory, lessThan((long)maxMemory)); + assertNull(buckets[3]); + + // Create and release a large buffer. + // Max memory is exceeded and buckets 3 and 1 are cleared. + // We will have 2x2+7=11. + buffer = bufferPool.newByteBuffer(7 * factor, true); + bufferPool.release(buffer); + memory = bufferPool.getMemory(true); + assertThat(memory, lessThanOrEqualTo((long)maxMemory)); + assertNull(buckets[0]); + assertNull(buckets[2]); } } diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java index 5a7e7a0d5e8..5bd536b48cf 100644 --- a/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java +++ b/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java @@ -32,6 +32,7 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.fail; @@ -163,32 +164,36 @@ public class MappedByteBufferPoolTest public void testMaxMemory() { int factor = 1024; - int maxMemory = 10 * 1024; + int maxMemory = 11 * 1024; MappedByteBufferPool bufferPool = new MappedByteBufferPool(factor, -1, null, -1, maxMemory); + ConcurrentMap buckets = bufferPool.bucketsFor(true); - int capacity = 3 * 1024; - ByteBuffer[] buffers = new ByteBuffer[maxMemory / capacity + 1]; - for (int i = 0; i < buffers.length; ++i) - buffers[i] = bufferPool.acquire(capacity, true); - - // Return all the buffers, but only some is retained by the pool. - for (ByteBuffer buffer : buffers) + // Create the buckets - the oldest is the larger. + // 1+2+3+4=10 / maxMemory=11. + for (int i = 4; i >= 1; --i) + { + int capacity = factor * i; + ByteBuffer buffer = bufferPool.acquire(capacity, true); bufferPool.release(buffer); + } - ConcurrentMap directMap = bufferPool.bucketsFor(true); - assertEquals(1, directMap.size()); - Bucket bucket = directMap.values().iterator().next(); - assertEquals(buffers.length - 1, bucket.size()); - - long memory1 = bufferPool.getMemory(true); - assertThat(memory1, lessThanOrEqualTo((long)maxMemory)); - - ByteBuffer buffer = bufferPool.acquire(capacity, true); - long memory2 = bufferPool.getMemory(true); - assertThat(memory2, lessThan(memory1)); - + // Create and release a buffer to exceed the max memory. + ByteBuffer buffer = bufferPool.newByteBuffer(2 * factor, true); bufferPool.release(buffer); - long memory3 = bufferPool.getMemory(true); - assertEquals(memory1, memory3); + + // Now the oldest buffer should be gone and we have: 1+2x2+3=8 + long memory = bufferPool.getMemory(true); + assertThat(memory, lessThan((long)maxMemory)); + assertNull(buckets.get(4)); + + // Create and release a large buffer. + // Max memory is exceeded and buckets 3 and 1 are cleared. + // We will have 2x2+7=11. + buffer = bufferPool.newByteBuffer(7 * factor, true); + bufferPool.release(buffer); + memory = bufferPool.getMemory(true); + assertThat(memory, lessThanOrEqualTo((long)maxMemory)); + assertNull(buckets.get(1)); + assertNull(buckets.get(3)); } }