Issue #1861 - Limit total bytes pooled by ByteBufferPools.

Implemented a limit for the total memory retained by the
ByteBufferPool for both direct and heap buffers.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2019-02-26 10:56:30 +01:00
parent f2d15f12d5
commit 7323b19f6f
6 changed files with 326 additions and 89 deletions

View File

@ -0,0 +1,93 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.io;
import java.nio.ByteBuffer;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
@ManagedObject
abstract class AbstractByteBufferPool implements ByteBufferPool
{
private final int _factor;
private final int _maxQueueLength;
private final long _maxHeapMemory;
private final AtomicLong _heapMemory = new AtomicLong();
private final long _maxDirectMemory;
private final AtomicLong _directMemory = new AtomicLong();
protected AbstractByteBufferPool(int factor, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
{
_factor = factor <= 0 ? 1024 : factor;
_maxQueueLength = maxQueueLength;
_maxHeapMemory = maxHeapMemory;
_maxDirectMemory = maxDirectMemory;
}
protected int getCapacityFactor()
{
return _factor;
}
protected int getMaxQueueLength()
{
return _maxQueueLength;
}
protected void decrementMemory(ByteBuffer buffer)
{
AtomicLong memory = buffer.isDirect() ? _directMemory : _heapMemory;
memory.addAndGet(-buffer.capacity());
}
protected boolean incrementMemory(ByteBuffer buffer)
{
boolean direct = buffer.isDirect();
int capacity = buffer.capacity();
long maxMemory = direct ? _maxDirectMemory : _maxHeapMemory;
AtomicLong memory = direct ? _directMemory : _heapMemory;
if (maxMemory <= 0)
return true;
while (true)
{
long current = memory.get();
long value = current + capacity;
if (value > maxMemory)
return false;
if (memory.compareAndSet(current, value))
return true;
}
}
@ManagedOperation(value = "Returns the memory occupied by this ByteBufferPool in bytes", impact = "INFO")
public long getMemory(boolean direct)
{
AtomicLong memory = direct ? _directMemory : _heapMemory;
return memory.get();
}
@ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION")
public void clear()
{
_heapMemory.set(0);
_directMemory.set(0);
}
}

View File

@ -19,9 +19,9 @@
package org.eclipse.jetty.io; package org.eclipse.jetty.io;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.function.IntFunction;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
/** /**
* <p>A ByteBuffer pool where ByteBuffers are held in queues that are held in array elements.</p> * <p>A ByteBuffer pool where ByteBuffers are held in queues that are held in array elements.</p>
@ -30,19 +30,18 @@ import org.eclipse.jetty.util.annotation.ManagedOperation;
* 2048, and so on.</p> * 2048, and so on.</p>
*/ */
@ManagedObject @ManagedObject
public class ArrayByteBufferPool implements ByteBufferPool public class ArrayByteBufferPool extends AbstractByteBufferPool
{ {
private final int _minCapacity; private final int _minCapacity;
private final ByteBufferPool.Bucket[] _direct; private final ByteBufferPool.Bucket[] _direct;
private final ByteBufferPool.Bucket[] _indirect; private final ByteBufferPool.Bucket[] _indirect;
private final int _factor;
/** /**
* Creates a new ArrayByteBufferPool with a default configuration. * Creates a new ArrayByteBufferPool with a default configuration.
*/ */
public ArrayByteBufferPool() public ArrayByteBufferPool()
{ {
this(-1, -1, -1, -1); this(-1, -1, -1);
} }
/** /**
@ -54,7 +53,7 @@ public class ArrayByteBufferPool implements ByteBufferPool
*/ */
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity) public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity)
{ {
this(minCapacity, factor, maxCapacity, -1); this(minCapacity, factor, maxCapacity, -1, -1, -1);
} }
/** /**
@ -67,68 +66,100 @@ public class ArrayByteBufferPool implements ByteBufferPool
*/ */
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength) public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength)
{ {
this(minCapacity, factor, maxCapacity, maxQueueLength, -1, -1);
}
/**
* Creates a new ArrayByteBufferPool with the given configuration.
*
* @param minCapacity the minimum ByteBuffer capacity
* @param factor the capacity factor
* @param maxCapacity the maximum ByteBuffer capacity
* @param maxQueueLength the maximum ByteBuffer queue length
* @param maxHeapMemory the max heap memory in bytes
* @param maxDirectMemory the max direct memory in bytes
*/
public ArrayByteBufferPool(int minCapacity, int factor, int maxCapacity, int maxQueueLength, long maxHeapMemory, long maxDirectMemory)
{
super(factor, maxQueueLength, maxHeapMemory, maxDirectMemory);
factor = getCapacityFactor();
if (minCapacity <= 0) if (minCapacity <= 0)
minCapacity = 0; minCapacity = 0;
if (factor <= 0)
factor = 1024;
if (maxCapacity <= 0) if (maxCapacity <= 0)
maxCapacity = 64 * 1024; maxCapacity = 64 * 1024;
if ((maxCapacity % factor) != 0 || factor >= maxCapacity) if ((maxCapacity % factor) != 0 || factor >= maxCapacity)
throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity"); throw new IllegalArgumentException("The capacity factor must be a divisor of maxCapacity");
_minCapacity = minCapacity; _minCapacity = minCapacity;
_factor = factor;
int length = maxCapacity / factor; int length = maxCapacity / factor;
_direct = new ByteBufferPool.Bucket[length]; _direct = new ByteBufferPool.Bucket[length];
_indirect = new ByteBufferPool.Bucket[length]; _indirect = new ByteBufferPool.Bucket[length];
int capacity = 0;
for (int i = 0; i < _direct.length; ++i)
{
capacity += _factor;
_direct[i] = new ByteBufferPool.Bucket(this, capacity, maxQueueLength);
_indirect[i] = new ByteBufferPool.Bucket(this, capacity, maxQueueLength);
}
} }
@Override @Override
public ByteBuffer acquire(int size, boolean direct) public ByteBuffer acquire(int size, boolean direct)
{ {
ByteBufferPool.Bucket bucket = bucketFor(size, direct); ByteBufferPool.Bucket bucket = bucketFor(size, direct, null);
if (bucket == null) if (bucket == null)
return newByteBuffer(size, direct); {
return bucket.acquire(direct); int capacity = size < _minCapacity ? size : (bucketFor(size) + 1) * getCapacityFactor();
return newByteBuffer(capacity, direct);
}
ByteBuffer buffer = bucket.acquire(direct);
decrementMemory(buffer);
return buffer;
} }
@Override @Override
public void release(ByteBuffer buffer) public void release(ByteBuffer buffer)
{ {
if (buffer != null) if (buffer == null)
{ return;
ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(), buffer.isDirect()); ByteBufferPool.Bucket bucket = bucketFor(buffer.capacity(), buffer.isDirect(), this::newBucket);
if (bucket != null) if (bucket != null && incrementMemory(buffer))
bucket.release(buffer); bucket.release(buffer);
}
} }
@ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION") private Bucket newBucket(int key)
{
return new Bucket(this, key * getCapacityFactor(), getMaxQueueLength());
}
@Override
public void clear() public void clear()
{ {
super.clear();
for (int i = 0; i < _direct.length; ++i) for (int i = 0; i < _direct.length; ++i)
{ {
_direct[i].clear(); Bucket bucket = _direct[i];
_indirect[i].clear(); if (bucket != null)
bucket.clear();
_direct[i] = null;
bucket = _indirect[i];
if (bucket != null)
bucket.clear();
_indirect[i] = null;
} }
} }
private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct) private int bucketFor(int capacity)
{ {
if (capacity <= _minCapacity) return (capacity - 1) / getCapacityFactor();
}
private ByteBufferPool.Bucket bucketFor(int capacity, boolean direct, IntFunction<Bucket> newBucket)
{
if (capacity < _minCapacity)
return null; return null;
int b = (capacity - 1) / _factor; int b = bucketFor(capacity);
if (b >= _direct.length) if (b >= _direct.length)
return null; return null;
return bucketsFor(direct)[b]; Bucket[] buckets = bucketsFor(direct);
Bucket bucket = buckets[b];
if (bucket == null && newBucket != null)
buckets[b] = bucket = newBucket.apply(b + 1);
return bucket;
} }
// Package local for testing // Package local for testing

View File

@ -131,7 +131,7 @@ public interface ByteBufferPool
} }
} }
class Bucket public static class Bucket
{ {
private final Deque<ByteBuffer> _queue = new ConcurrentLinkedDeque<>(); private final Deque<ByteBuffer> _queue = new ConcurrentLinkedDeque<>();
private final ByteBufferPool _pool; private final ByteBufferPool _pool;

View File

@ -22,10 +22,10 @@ import java.nio.ByteBuffer;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.annotation.ManagedOperation;
/** /**
* <p>A ByteBuffer pool where ByteBuffers are held in queues that are held in a Map.</p> * <p>A ByteBuffer pool where ByteBuffers are held in queues that are held in a Map.</p>
@ -34,12 +34,11 @@ import org.eclipse.jetty.util.annotation.ManagedOperation;
* queue of ByteBuffers each of capacity 2048, and so on.</p> * queue of ByteBuffers each of capacity 2048, and so on.</p>
*/ */
@ManagedObject @ManagedObject
public class MappedByteBufferPool implements ByteBufferPool public class MappedByteBufferPool extends AbstractByteBufferPool
{ {
private final ConcurrentMap<Integer, Bucket> directBuffers = new ConcurrentHashMap<>(); private final ConcurrentMap<Integer, Bucket> _directBuffers = new ConcurrentHashMap<>();
private final ConcurrentMap<Integer, Bucket> heapBuffers = new ConcurrentHashMap<>(); private final ConcurrentMap<Integer, Bucket> _heapBuffers = new ConcurrentHashMap<>();
private final int _factor; private final Function<Integer, Bucket> _newBucket;
private final int _maxQueueLength;
/** /**
* Creates a new MappedByteBufferPool with a default configuration. * Creates a new MappedByteBufferPool with a default configuration.
@ -60,20 +59,46 @@ public class MappedByteBufferPool implements ByteBufferPool
} }
/** /**
* Creates a new MappedByteBufferPool with the given capacity factor and max queue length. * Creates a new MappedByteBufferPool with the given configuration.
* *
* @param factor the capacity factor * @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length * @param maxQueueLength the maximum ByteBuffer queue length
*/ */
public MappedByteBufferPool(int factor, int maxQueueLength) public MappedByteBufferPool(int factor, int maxQueueLength)
{ {
_factor = factor <= 0 ? 1024 : factor; this(factor, maxQueueLength, null);
_maxQueueLength = maxQueueLength; }
/**
* Creates a new MappedByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length
* @param newBucket the function that creates a Bucket
*/
public MappedByteBufferPool(int factor, int maxQueueLength, Function<Integer, Bucket> newBucket)
{
this(factor, maxQueueLength, newBucket, -1, -1);
}
/**
* Creates a new MappedByteBufferPool with the given configuration.
*
* @param factor the capacity factor
* @param maxQueueLength the maximum ByteBuffer queue length
* @param newBucket the function that creates a Bucket
* @param maxHeapMemory the max heap memory in bytes
* @param maxDirectMemory the max direct memory in bytes
*/
public MappedByteBufferPool(int factor, int maxQueueLength, Function<Integer, Bucket> newBucket, long maxHeapMemory, long maxDirectMemory)
{
super(factor, maxQueueLength, maxHeapMemory, maxDirectMemory);
_newBucket = newBucket != null ? newBucket : this::newBucket;
} }
private Bucket newBucket(int key) private Bucket newBucket(int key)
{ {
return new Bucket(this, key * _factor, _maxQueueLength); return new Bucket(this, key * getCapacityFactor(), getMaxQueueLength());
} }
@Override @Override
@ -83,8 +108,10 @@ public class MappedByteBufferPool implements ByteBufferPool
ConcurrentMap<Integer, Bucket> buffers = bucketsFor(direct); ConcurrentMap<Integer, Bucket> buffers = bucketsFor(direct);
Bucket bucket = buffers.get(b); Bucket bucket = buffers.get(b);
if (bucket == null) if (bucket == null)
return newByteBuffer(b * _factor, direct); return newByteBuffer(b * getCapacityFactor(), direct);
return bucket.acquire(direct); ByteBuffer buffer = bucket.acquire(direct);
decrementMemory(buffer);
return buffer;
} }
@Override @Override
@ -93,29 +120,33 @@ public class MappedByteBufferPool implements ByteBufferPool
if (buffer == null) if (buffer == null)
return; // nothing to do return; // nothing to do
// validate that this buffer is from this pool int capacity = buffer.capacity();
assert ((buffer.capacity() % _factor) == 0); // Validate that this buffer is from this pool.
assert ((capacity % getCapacityFactor()) == 0);
int b = bucketFor(buffer.capacity()); int b = bucketFor(capacity);
ConcurrentMap<Integer, Bucket> buckets = bucketsFor(buffer.isDirect()); ConcurrentMap<Integer, Bucket> buckets = bucketsFor(buffer.isDirect());
Bucket bucket = buckets.computeIfAbsent(b, _newBucket);
Bucket bucket = buckets.computeIfAbsent(b, this::newBucket); if (incrementMemory(buffer))
bucket.release(buffer); bucket.release(buffer);
} }
@ManagedOperation(value = "Clears this ByteBufferPool", impact = "ACTION") @Override
public void clear() public void clear()
{ {
directBuffers.values().forEach(Bucket::clear); super.clear();
directBuffers.clear(); _directBuffers.values().forEach(Bucket::clear);
heapBuffers.values().forEach(Bucket::clear); _directBuffers.clear();
heapBuffers.clear(); _heapBuffers.values().forEach(Bucket::clear);
_heapBuffers.clear();
} }
private int bucketFor(int size) private int bucketFor(int size)
{ {
int bucket = size / _factor; int factor = getCapacityFactor();
if (size % _factor > 0) int bucket = size / factor;
if (bucket * factor != size)
++bucket; ++bucket;
return bucket; return bucket;
} }
@ -123,7 +154,7 @@ public class MappedByteBufferPool implements ByteBufferPool
// Package local for testing // Package local for testing
ConcurrentMap<Integer, Bucket> bucketsFor(boolean direct) ConcurrentMap<Integer, Bucket> bucketsFor(boolean direct)
{ {
return direct ? directBuffers : heapBuffers; return direct ? _directBuffers : _heapBuffers;
} }
public static class Tagged extends MappedByteBufferPool public static class Tagged extends MappedByteBufferPool

View File

@ -20,12 +20,17 @@ package org.eclipse.jetty.io;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import org.eclipse.jetty.io.ByteBufferPool.Bucket; import org.eclipse.jetty.io.ByteBufferPool.Bucket;
import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotSame; import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertSame;
@ -46,12 +51,18 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect()); assertTrue(buffer.isDirect());
assertEquals(size, buffer.capacity()); assertEquals(size, buffer.capacity());
for (ByteBufferPool.Bucket bucket : buckets) for (ByteBufferPool.Bucket bucket : buckets)
assertTrue(bucket.isEmpty()); {
if (bucket != null)
assertTrue(bucket.isEmpty());
}
bufferPool.release(buffer); bufferPool.release(buffer);
for (ByteBufferPool.Bucket bucket : buckets) for (ByteBufferPool.Bucket bucket : buckets)
assertTrue(bucket.isEmpty()); {
if (bucket != null)
assertTrue(bucket.isEmpty());
}
} }
} }
@ -69,15 +80,17 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect()); assertTrue(buffer.isDirect());
assertThat(buffer.capacity(), greaterThanOrEqualTo(size)); assertThat(buffer.capacity(), greaterThanOrEqualTo(size));
for (ByteBufferPool.Bucket bucket : buckets) for (ByteBufferPool.Bucket bucket : buckets)
assertTrue(bucket.isEmpty()); {
if (bucket != null)
assertTrue(bucket.isEmpty());
}
bufferPool.release(buffer); bufferPool.release(buffer);
int pooled = 0; int pooled = Arrays.stream(buckets)
for (ByteBufferPool.Bucket bucket : buckets) .filter(Objects::nonNull)
{ .mapToInt(Bucket::size)
pooled += bucket.size(); .sum();
}
assertEquals(size <= 1000, 1 == pooled); assertEquals(size <= 1000, 1 == pooled);
} }
} }
@ -96,20 +109,17 @@ public class ArrayByteBufferPoolTest
assertTrue(buffer.isDirect()); assertTrue(buffer.isDirect());
assertThat(buffer.capacity(), greaterThanOrEqualTo(size)); assertThat(buffer.capacity(), greaterThanOrEqualTo(size));
for (ByteBufferPool.Bucket bucket : buckets) for (ByteBufferPool.Bucket bucket : buckets)
assertTrue(bucket.isEmpty()); {
if (bucket != null)
assertTrue(bucket.isEmpty());
}
bufferPool.release(buffer); bufferPool.release(buffer);
int pooled = 0; int pooled = Arrays.stream(buckets)
for (ByteBufferPool.Bucket bucket : buckets) .filter(Objects::nonNull)
{ .mapToInt(Bucket::size)
if (!bucket.isEmpty()) .sum();
{
pooled += bucket.size();
// TODO assertThat(bucket._bufferSize,greaterThanOrEqualTo(size));
// TODO assertThat(bucket._bufferSize,Matchers.lessThan(size+100));
}
}
assertEquals(1, pooled); assertEquals(1, pooled);
} }
} }
@ -130,16 +140,10 @@ public class ArrayByteBufferPoolTest
ByteBuffer buffer3 = bufferPool.acquire(size, false); ByteBuffer buffer3 = bufferPool.acquire(size, false);
bufferPool.release(buffer3); bufferPool.release(buffer3);
int pooled = 0; int pooled = Arrays.stream(buckets)
for (ByteBufferPool.Bucket bucket : buckets) .filter(Objects::nonNull)
{ .mapToInt(Bucket::size)
if (!bucket.isEmpty()) .sum();
{
pooled += bucket.size();
// TODO assertThat(bucket._bufferSize,greaterThanOrEqualTo(size));
// TODO assertThat(bucket._bufferSize,Matchers.lessThan(size+100));
}
}
assertEquals(1, pooled); assertEquals(1, pooled);
assertSame(buffer1, buffer2); assertSame(buffer1, buffer2);
@ -157,10 +161,16 @@ public class ArrayByteBufferPoolTest
ByteBuffer buffer3 = bufferPool.acquire(512, false); ByteBuffer buffer3 = bufferPool.acquire(512, false);
Bucket[] buckets = bufferPool.bucketsFor(false); Bucket[] buckets = bufferPool.bucketsFor(false);
Arrays.asList(buckets).forEach(b -> assertEquals(0, b.size())); Arrays.stream(buckets)
.filter(Objects::nonNull)
.forEach(b -> assertEquals(0, b.size()));
bufferPool.release(buffer1); bufferPool.release(buffer1);
Bucket bucket = Arrays.stream(buckets).filter(b -> b.size() > 0).findFirst().get(); Bucket bucket = Arrays.stream(buckets)
.filter(Objects::nonNull)
.filter(b -> b.size() > 0)
.findFirst()
.orElseThrow(AssertionError::new);
assertEquals(1, bucket.size()); assertEquals(1, bucket.size());
bufferPool.release(buffer2); bufferPool.release(buffer2);
@ -169,4 +179,41 @@ public class ArrayByteBufferPoolTest
bufferPool.release(buffer3); bufferPool.release(buffer3);
assertEquals(2, bucket.size()); assertEquals(2, bucket.size());
} }
@Test
public void testMaxMemory()
{
int factor = 1024;
int maxMemory = 10 * 1024;
ArrayByteBufferPool bufferPool = new ArrayByteBufferPool(-1, factor, -1, -1, -1, maxMemory);
int capacity = 3 * 1024;
ByteBuffer[] buffers = new ByteBuffer[maxMemory / capacity + 1];
for (int i = 0; i < buffers.length; ++i)
buffers[i] = bufferPool.acquire(capacity, true);
// Return all the buffers, but only some is retained by the pool.
for (ByteBuffer buffer : buffers)
bufferPool.release(buffer);
List<Bucket> directBuckets = Arrays.stream(bufferPool.bucketsFor(true))
.filter(Objects::nonNull)
.filter(b -> !b.isEmpty())
.collect(Collectors.toList());
assertEquals(1, directBuckets.size());
Bucket bucket = directBuckets.get(0);
assertEquals(buffers.length - 1, bucket.size());
long memory1 = bufferPool.getMemory(true);
assertThat(memory1, lessThanOrEqualTo((long)maxMemory));
ByteBuffer buffer = bufferPool.acquire(capacity, true);
long memory2 = bufferPool.getMemory(true);
assertThat(memory2, lessThan(memory1));
bufferPool.release(buffer);
long memory3 = bufferPool.getMemory(true);
assertEquals(memory1, memory3);
}
} }

View File

@ -29,6 +29,8 @@ import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.greaterThanOrEqualTo; import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.lessThan;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertSame; import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -156,4 +158,37 @@ public class MappedByteBufferPoolTest
bufferPool.release(buffer3); bufferPool.release(buffer3);
assertEquals(2, bucket.size()); assertEquals(2, bucket.size());
} }
@Test
public void testMaxMemory()
{
int factor = 1024;
int maxMemory = 10 * 1024;
MappedByteBufferPool bufferPool = new MappedByteBufferPool(factor, -1, null, -1, maxMemory);
int capacity = 3 * 1024;
ByteBuffer[] buffers = new ByteBuffer[maxMemory / capacity + 1];
for (int i = 0; i < buffers.length; ++i)
buffers[i] = bufferPool.acquire(capacity, true);
// Return all the buffers, but only some is retained by the pool.
for (ByteBuffer buffer : buffers)
bufferPool.release(buffer);
ConcurrentMap<Integer, Bucket> directMap = bufferPool.bucketsFor(true);
assertEquals(1, directMap.size());
Bucket bucket = directMap.values().iterator().next();
assertEquals(buffers.length - 1, bucket.size());
long memory1 = bufferPool.getMemory(true);
assertThat(memory1, lessThanOrEqualTo((long)maxMemory));
ByteBuffer buffer = bufferPool.acquire(capacity, true);
long memory2 = bufferPool.getMemory(true);
assertThat(memory2, lessThan(memory1));
bufferPool.release(buffer);
long memory3 = bufferPool.getMemory(true);
assertEquals(memory1, memory3);
}
} }