From 7323b19f6ffd80c6f1dda180432a8b9cfb74e124 Mon Sep 17 00:00:00 2001
From: Simone Bordet
Date: Tue, 26 Feb 2019 10:56:30 +0100
Subject: [PATCH] Issue #1861 - Limit total bytes pooled by ByteBufferPools.
Implemented a limit for the total memory retained by the
ByteBufferPool for both direct and heap buffers.
Signed-off-by: Simone Bordet
---
.../jetty/io/AbstractByteBufferPool.java | 93 +++++++++++++++
.../eclipse/jetty/io/ArrayByteBufferPool.java | 95 ++++++++++-----
.../org/eclipse/jetty/io/ByteBufferPool.java | 2 +-
.../jetty/io/MappedByteBufferPool.java | 81 +++++++++----
.../jetty/io/ArrayByteBufferPoolTest.java | 109 +++++++++++++-----
.../jetty/io/MappedByteBufferPoolTest.java | 35 ++++++
6 files changed, 326 insertions(+), 89 deletions(-)
create mode 100644 jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java
new file mode 100644
index 00000000000..442924306ee
--- /dev/null
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractByteBufferPool.java
@@ -0,0 +1,93 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.io;
+
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.eclipse.jetty.util.annotation.ManagedObject;
+import org.eclipse.jetty.util.annotation.ManagedOperation;
+
+@ManagedObject
+abstract class AbstractByteBufferPool implements ByteBufferPool
+{
+ private final int _factor;
+ private final int _maxQueueLength;
+ private final long _maxHeapMemory;
+ private final AtomicLong _heapMemory = new AtomicLong();
+ private final long _maxDirectMemory;
+ private final AtomicLong _directMemory = new AtomicLong();
+
+ protected AbstractByteBufferPool(int factor, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
+ {
+ _factor = factor <= 0 ? 1024 : factor;
+ _maxQueueLength = maxQueueLength;
+ _maxHeapMemory = maxHeapMemory;
+ _maxDirectMemory = maxDirectMemory;
+ }
+
+ protected int getCapacityFactor()
+ {
+ return _factor;
+ }
+
+ protected int getMaxQueueLength()
+ {
+ return _maxQueueLength;
+ }
+
+ protected void decrementMemory(ByteBuffer buffer)
+ {
+ AtomicLong memory = buffer.isDirect() ? _directMemory : _heapMemory;
+ memory.addAndGet(-buffer.capacity());
+ }
+
+ protected boolean incrementMemory(ByteBuffer buffer)
+ {
+ boolean direct = buffer.isDirect();
+ int capacity = buffer.capacity();
+ long maxMemory = direct ? _maxDirectMemory : _maxHeapMemory;
+ AtomicLong memory = direct ? _directMemory : _heapMemory;
+ if (maxMemory <= 0)
+ return true;
+ while (true)
+ {
+ long current = memory.get();
+ long value = current + capacity;
+ if (value > maxMemory)
+ return false;
+ if (memory.compareAndSet(current, value))
+ return true;
+ }
+ }
+
+ @ManagedOperation(value = "Returns the memory occupied by this ByteBufferPool in bytes", impact = "INFO")
+ public long getMemory(boolean direct)
+ {
+ AtomicLong memory = direct ? _directMemory : _heapMemory;
+ return memory.get();
+ }
+
+ @ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION")
+ public void clear()
+ {
+ _heapMemory.set(0);
+ _directMemory.set(0);
+ }
+}
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java
index 5c742839abd..b1911b9b2c1 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ArrayByteBufferPool.java
@@ -19,9 +19,9 @@
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
+import java.util.function.IntFunction;
import org.eclipse.jetty.util.annotation.ManagedObject;
-import org.eclipse.jetty.util.annotation.ManagedOperation;
/**
* A ByteBuffer pool where ByteBuffers are held in queues that are held in array elements.
@@ -30,19 +30,18 @@ import org.eclipse.jetty.util.annotation.ManagedOperation;
* 2048, and so on.
*/
@ManagedObject
-public class ArrayByteBufferPool implements ByteBufferPool
+public class ArrayByteBufferPool extends AbstractByteBufferPool
{
private final int _minCapacity;
private final ByteBufferPool.Bucket[] _direct;
private final ByteBufferPool.Bucket[] _indirect;
- private final int _factor;
/**
* Creates a new ArrayByteBufferPool with a default configuration.
*/
public ArrayByteBufferPool()
{
- this(-1, -1, -1, -1);
+ this(-1, -1, -1);
}
/**
@@ -54,7 +53,7 @@ public class ArrayByteBufferPool implements ByteBufferPool
*/
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity)
{
- this(minCapacity, factor, maxCapacity, -1);
+ this(minCapacity, factor, maxCapacity, -1, -1, -1);
}
/**
@@ -67,68 +66,100 @@ public class ArrayByteBufferPool implements ByteBufferPool
*/
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength)
{
+ this(minCapacity, factor, maxCapacity, maxQueueLength, -1, -1);
+ }
+
+ /**
+ * Creates a new ArrayByteBufferPool with the given configuration.
+ *
+ * @param minCapacity the minimum ByteBuffer capacity
+ * @param factor the capacity factor
+ * @param maxCapacity the maximum ByteBuffer capacity
+ * @param maxQueueLength the maximum ByteBuffer queue length
+ * @param maxHeapMemory the max heap memory in bytes
+ * @param maxDirectMemory the max direct memory in bytes
+ */
+ public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
+ {
+ super(factor, maxQueueLength, maxHeapMemory, maxDirectMemory);
+
+ factor = getCapacityFactor();
if (minCapacity <= 0)
minCapacity = 0;
- if (factor <= 0)
- factor = 1024;
if (maxCapacity <= 0)
maxCapacity = 64 * 1024;
if ((maxCapacity % factor) != 0 || factor >= maxCapacity)
throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity");
_minCapacity = minCapacity;
- _factor = factor;
int length = maxCapacity / factor;
_direct = new ByteBufferPool.Bucket[length];
_indirect = new ByteBufferPool.Bucket[length];
-
- int capacity = 0;
- for (int i = 0; i < _direct.length; ++i)
- {
- capacity += _factor;
- _direct[i] = new ByteBufferPool.Bucket(this, capacity, maxQueueLength);
- _indirect[i] = new ByteBufferPool.Bucket(this, capacity, maxQueueLength);
- }
}
@Override
public ByteBuffer acquire(int size, boolean direct)
{
- ByteBufferPool.Bucket bucket = bucketFor(size, direct);
+ ByteBufferPool.Bucket bucket = bucketFor(size, direct, null);
if (bucket == null)
- return newByteBuffer(size, direct);
- return bucket.acquire(direct);
+ {
+ int capacity = size < _minCapacity ? size : (bucketFor(size) + 1) * getCapacityFactor();
+ return newByteBuffer(capacity, direct);
+ }
+ ByteBuffer buffer = bucket.acquire(direct);
+ decrementMemory(buffer);
+ return buffer;
}
@Override
public void release(ByteBuffer buffer)
{
- if (buffer != null)
- {
- ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(), buffer.isDirect());
- if (bucket != null)
- bucket.release(buffer);
- }
+ if (buffer == null)
+ return;
+ ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(), buffer.isDirect(), this::newBucket);
+ if (bucket != null && incrementMemory(buffer))
+ bucket.release(buffer);
}
- @ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION")
+ private Bucket newBucket(int key)
+ {
+ return new Bucket(this, key * getCapacityFactor(), getMaxQueueLength());
+ }
+
+ @Override
public void clear()
{
+ super.clear();
for (int i = 0; i < _direct.length; ++i)
{
- _direct[i].clear();
- _indirect[i].clear();
+ Bucket bucket = _direct[i];
+ if (bucket != null)
+ bucket.clear();
+ _direct[i] = null;
+ bucket = _indirect[i];
+ if (bucket != null)
+ bucket.clear();
+ _indirect[i] = null;
}
}
- private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct)
+ private int bucketFor(int capacity)
{
- if (capacity <= _minCapacity)
+ return (capacity - 1) / getCapacityFactor();
+ }
+
+ private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct, IntFunction newBucket)
+ {
+ if (capacity < _minCapacity)
return null;
- int b = (capacity - 1) / _factor;
+ int b = bucketFor(capacity);
if (b >= _direct.length)
return null;
- return bucketsFor(direct)[b];
+ Bucket[] buckets = bucketsFor(direct);
+ Bucket bucket = buckets[b];
+ if (bucket == null && newBucket != null)
+ buckets[b] = bucket = newBucket.apply(b + 1);
+ return bucket;
}
// Package local for testing
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java
index 138affb9327..1c58efdf444 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteBufferPool.java
@@ -131,7 +131,7 @@ public interface ByteBufferPool
}
}
- class Bucket
+ public static class Bucket
{
private final Deque _queue = new ConcurrentLinkedDeque<>();
private final ByteBufferPool _pool;
diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java b/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java
index 07f1d57d7cb..0ddc2ed6300 100644
--- a/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java
+++ b/jetty-io/src/main/java/org/eclipse/jetty/io/MappedByteBufferPool.java
@@ -22,10 +22,10 @@ import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Function;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.annotation.ManagedObject;
-import org.eclipse.jetty.util.annotation.ManagedOperation;
/**
* A ByteBuffer pool where ByteBuffers are held in queues that are held in a Map.
@@ -34,12 +34,11 @@ import org.eclipse.jetty.util.annotation.ManagedOperation;
* queue of ByteBuffers each of capacity 2048, and so on.
*/
@ManagedObject
-public class MappedByteBufferPool implements ByteBufferPool
+public class MappedByteBufferPool extends AbstractByteBufferPool
{
- private final ConcurrentMap directBuffers = new ConcurrentHashMap<>();
- private final ConcurrentMap heapBuffers = new ConcurrentHashMap<>();
- private final int _factor;
- private final int _maxQueueLength;
+ private final ConcurrentMap _directBuffers = new ConcurrentHashMap<>();
+ private final ConcurrentMap _heapBuffers = new ConcurrentHashMap<>();
+ private final Function _newBucket;
/**
* Creates a new MappedByteBufferPool with a default configuration.
@@ -60,20 +59,46 @@ public class MappedByteBufferPool implements ByteBufferPool
}
/**
- * Creates a new MappedByteBufferPool with the given capacity factor and max queue length.
+ * Creates a new MappedByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length
*/
public MappedByteBufferPool(int factor, int maxQueueLength)
{
- _factor = factor <= 0 ? 1024 : factor;
- _maxQueueLength = maxQueueLength;
+ this(factor, maxQueueLength, null);
+ }
+
+ /**
+ * Creates a new MappedByteBufferPool with the given configuration.
+ *
+ * @param factor the capacity factor
+ * @param maxQueueLength the maximum ByteBuffer queue length
+ * @param newBucket the function that creates a Bucket
+ */
+ public MappedByteBufferPool(int factor, int maxQueueLength, Function newBucket)
+ {
+ this(factor, maxQueueLength, newBucket, -1, -1);
+ }
+
+ /**
+ * Creates a new MappedByteBufferPool with the given configuration.
+ *
+ * @param factor the capacity factor
+ * @param maxQueueLength the maximum ByteBuffer queue length
+ * @param newBucket the function that creates a Bucket
+ * @param maxHeapMemory the max heap memory in bytes
+ * @param maxDirectMemory the max direct memory in bytes
+ */
+ public MappedByteBufferPool(int factor, int maxQueueLength, Function newBucket, long maxHeapMemory, long maxDirectMemory)
+ {
+ super(factor, maxQueueLength, maxHeapMemory, maxDirectMemory);
+ _newBucket = newBucket != null ? newBucket : this::newBucket;
}
private Bucket newBucket(int key)
{
- return new Bucket(this, key * _factor, _maxQueueLength);
+ return new Bucket(this, key * getCapacityFactor(), getMaxQueueLength());
}
@Override
@@ -83,8 +108,10 @@ public class MappedByteBufferPool implements ByteBufferPool
ConcurrentMap buffers = bucketsFor(direct);
Bucket bucket = buffers.get(b);
if (bucket == null)
- return newByteBuffer(b * _factor, direct);
- return bucket.acquire(direct);
+ return newByteBuffer(b * getCapacityFactor(), direct);
+ ByteBuffer buffer = bucket.acquire(direct);
+ decrementMemory(buffer);
+ return buffer;
}
@Override
@@ -93,29 +120,33 @@ public class MappedByteBufferPool implements ByteBufferPool
if (buffer == null)
return; // nothing to do
- // validate that this buffer is from this pool
- assert ((buffer.capacity() % _factor) == 0);
+ int capacity = buffer.capacity();
+ // Validate that this buffer is from this pool.
+ assert ((capacity % getCapacityFactor()) == 0);
- int b = bucketFor(buffer.capacity());
+ int b = bucketFor(capacity);
ConcurrentMap buckets = bucketsFor(buffer.isDirect());
+ Bucket bucket = buckets.computeIfAbsent(b, _newBucket);
- Bucket bucket = buckets.computeIfAbsent(b, this::newBucket);
- bucket.release(buffer);
+ if (incrementMemory(buffer))
+ bucket.release(buffer);
}
- @ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION")
+ @Override
public void clear()
{
- directBuffers.values().forEach(Bucket::clear);
- directBuffers.clear();
- heapBuffers.values().forEach(Bucket::clear);
- heapBuffers.clear();
+ super.clear();
+ _directBuffers.values().forEach(Bucket::clear);
+ _directBuffers.clear();
+ _heapBuffers.values().forEach(Bucket::clear);
+ _heapBuffers.clear();
}
private int bucketFor(int size)
{
- int bucket = size / _factor;
- if (size % _factor > 0)
+ int factor = getCapacityFactor();
+ int bucket = size / factor;
+ if (bucket * factor != size)
++bucket;
return bucket;
}
@@ -123,7 +154,7 @@ public class MappedByteBufferPool implements ByteBufferPool
// Package local for testing
ConcurrentMap bucketsFor(boolean direct)
{
- return direct ? directBuffers : heapBuffers;
+ return direct ? _directBuffers : _heapBuffers;
}
public static class Tagged extends MappedByteBufferPool
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java
index f6f912759db..99b2fb00fa9 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/ArrayByteBufferPoolTest.java
@@ -20,12 +20,17 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import java.util.stream.Collectors;
import org.eclipse.jetty.io.ByteBufferPool.Bucket;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame;
@@ -46,12 +51,18 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect());
assertEquals(size, buffer.capacity());
for (ByteBufferPool.Bucket bucket : buckets)
- assertTrue(bucket.isEmpty());
+ {
+ if (bucket != null)
+ assertTrue(bucket.isEmpty());
+ }
bufferPool.release(buffer);
for (ByteBufferPool.Bucket bucket : buckets)
- assertTrue(bucket.isEmpty());
+ {
+ if (bucket != null)
+ assertTrue(bucket.isEmpty());
+ }
}
}
@@ -69,15 +80,17 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect());
assertThat(buffer.capacity(), greaterThanOrEqualTo(size));
for (ByteBufferPool.Bucket bucket : buckets)
- assertTrue(bucket.isEmpty());
+ {
+ if (bucket != null)
+ assertTrue(bucket.isEmpty());
+ }
bufferPool.release(buffer);
- int pooled = 0;
- for (ByteBufferPool.Bucket bucket : buckets)
- {
- pooled += bucket.size();
- }
+ int pooled = Arrays.stream(buckets)
+ .filter(Objects::nonNull)
+ .mapToInt(Bucket::size)
+ .sum();
assertEquals(size <= 1000, 1 == pooled);
}
}
@@ -96,20 +109,17 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect());
assertThat(buffer.capacity(), greaterThanOrEqualTo(size));
for (ByteBufferPool.Bucket bucket : buckets)
- assertTrue(bucket.isEmpty());
+ {
+ if (bucket != null)
+ assertTrue(bucket.isEmpty());
+ }
bufferPool.release(buffer);
- int pooled = 0;
- for (ByteBufferPool.Bucket bucket : buckets)
- {
- if (!bucket.isEmpty())
- {
- pooled += bucket.size();
- // TODO assertThat(bucket._bufferSize,greaterThanOrEqualTo(size));
- // TODO assertThat(bucket._bufferSize,Matchers.lessThan(size+100));
- }
- }
+ int pooled = Arrays.stream(buckets)
+ .filter(Objects::nonNull)
+ .mapToInt(Bucket::size)
+ .sum();
assertEquals(1, pooled);
}
}
@@ -130,16 +140,10 @@ public class ArrayByteBufferPoolTest
ByteBuffer buffer3 = bufferPool.acquire(size, false);
bufferPool.release(buffer3);
- int pooled = 0;
- for (ByteBufferPool.Bucket bucket : buckets)
- {
- if (!bucket.isEmpty())
- {
- pooled += bucket.size();
- // TODO assertThat(bucket._bufferSize,greaterThanOrEqualTo(size));
- // TODO assertThat(bucket._bufferSize,Matchers.lessThan(size+100));
- }
- }
+ int pooled = Arrays.stream(buckets)
+ .filter(Objects::nonNull)
+ .mapToInt(Bucket::size)
+ .sum();
assertEquals(1, pooled);
assertSame(buffer1, buffer2);
@@ -157,10 +161,16 @@ public class ArrayByteBufferPoolTest
ByteBuffer buffer3 = bufferPool.acquire(512, false);
Bucket[] buckets = bufferPool.bucketsFor(false);
- Arrays.asList(buckets).forEach(b -> assertEquals(0, b.size()));
+ Arrays.stream(buckets)
+ .filter(Objects::nonNull)
+ .forEach(b -> assertEquals(0, b.size()));
bufferPool.release(buffer1);
- Bucket bucket = Arrays.stream(buckets).filter(b -> b.size() > 0).findFirst().get();
+ Bucket bucket = Arrays.stream(buckets)
+ .filter(Objects::nonNull)
+ .filter(b -> b.size() > 0)
+ .findFirst()
+ .orElseThrow(AssertionError::new);
assertEquals(1, bucket.size());
bufferPool.release(buffer2);
@@ -169,4 +179,41 @@ public class ArrayByteBufferPoolTest
bufferPool.release(buffer3);
assertEquals(2, bucket.size());
}
+
+ @Test
+ public void testMaxMemory()
+ {
+ int factor = 1024;
+ int maxMemory = 10 * 1024;
+ ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(-1, factor, -1, -1, -1, maxMemory);
+
+ int capacity = 3 * 1024;
+ ByteBuffer[] buffers = new ByteBuffer[maxMemory / capacity + 1];
+ for (int i = 0; i < buffers.length; ++i)
+ buffers[i] = bufferPool.acquire(capacity, true);
+
+ // Return all the buffers, but only some is retained by the pool.
+ for (ByteBuffer buffer : buffers)
+ bufferPool.release(buffer);
+
+ List directBuckets = Arrays.stream(bufferPool.bucketsFor(true))
+ .filter(Objects::nonNull)
+ .filter(b -> !b.isEmpty())
+ .collect(Collectors.toList());
+ assertEquals(1, directBuckets.size());
+
+ Bucket bucket = directBuckets.get(0);
+ assertEquals(buffers.length - 1, bucket.size());
+
+ long memory1 = bufferPool.getMemory(true);
+ assertThat(memory1, lessThanOrEqualTo((long)maxMemory));
+
+ ByteBuffer buffer = bufferPool.acquire(capacity, true);
+ long memory2 = bufferPool.getMemory(true);
+ assertThat(memory2, lessThan(memory1));
+
+ bufferPool.release(buffer);
+ long memory3 = bufferPool.getMemory(true);
+ assertEquals(memory1, memory3);
+ }
}
diff --git a/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java b/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java
index c6281767690..5a7e7a0d5e8 100644
--- a/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java
+++ b/jetty-io/src/test/java/org/eclipse/jetty/io/MappedByteBufferPoolTest.java
@@ -29,6 +29,8 @@ import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
+import static org.hamcrest.Matchers.lessThan;
+import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -156,4 +158,37 @@ public class MappedByteBufferPoolTest
bufferPool.release(buffer3);
assertEquals(2, bucket.size());
}
+
+ @Test
+ public void testMaxMemory()
+ {
+ int factor = 1024;
+ int maxMemory = 10 * 1024;
+ MappedByteBufferPool bufferPool = new MappedByteBufferPool(factor, -1, null, -1, maxMemory);
+
+ int capacity = 3 * 1024;
+ ByteBuffer[] buffers = new ByteBuffer[maxMemory / capacity + 1];
+ for (int i = 0; i < buffers.length; ++i)
+ buffers[i] = bufferPool.acquire(capacity, true);
+
+ // Return all the buffers, but only some is retained by the pool.
+ for (ByteBuffer buffer : buffers)
+ bufferPool.release(buffer);
+
+ ConcurrentMap directMap = bufferPool.bucketsFor(true);
+ assertEquals(1, directMap.size());
+ Bucket bucket = directMap.values().iterator().next();
+ assertEquals(buffers.length - 1, bucket.size());
+
+ long memory1 = bufferPool.getMemory(true);
+ assertThat(memory1, lessThanOrEqualTo((long)maxMemory));
+
+ ByteBuffer buffer = bufferPool.acquire(capacity, true);
+ long memory2 = bufferPool.getMemory(true);
+ assertThat(memory2, lessThan(memory1));
+
+ bufferPool.release(buffer);
+ long memory3 = bufferPool.getMemory(true);
+ assertEquals(memory1, memory3);
+ }
}