Merge remote-tracking branch 'origin/jetty-9.4.x' into jetty-10.0.x

Signed-off-by: gregw <gregw@webtide.com>
This commit is contained in:
gregw 2020-08-12 11:42:36 +02:00
commit cd42fd1635
4 changed files with 473 additions and 201 deletions

View File

@ -96,13 +96,13 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
@ManagedAttribute(value = "The number of active connections", readonly = true)
public int getActiveConnectionCount()
{
return pool.getInUseConnectionCount();
return pool.getInUseCount();
}
@ManagedAttribute(value = "The number of idle connections", readonly = true)
public int getIdleConnectionCount()
{
return pool.getIdleConnectionCount();
return pool.getIdleCount();
}
@ManagedAttribute(value = "The max number of connections", readonly = true)
@ -120,7 +120,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
@ManagedAttribute(value = "The number of pending connections", readonly = true)
public int getPendingConnectionCount()
{
return pool.getPendingConnectionCount();
return pool.getReservedCount();
}
@Override
@ -164,21 +164,19 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
private CompletableFuture<Void> tryCreateReturningFuture(int maxPending)
{
CompletableFuture<Void> future = new CompletableFuture<>();
if (LOG.isDebugEnabled())
{
LOG.debug("tryCreate {}/{} connections {}/{} pending", pool.size(), pool.getMaxEntries(), getPendingConnectionCount(), maxPending);
}
Pool<Connection>.Entry entry = pool.reserve(maxPending);
if (entry == null)
{
future.complete(null);
return future;
}
return CompletableFuture.completedFuture(null);
if (LOG.isDebugEnabled())
LOG.debug("newConnection {}/{} connections {}/{} pending", pool.size(), pool.getMaxEntries(), getPendingConnectionCount(), maxPending);
CompletableFuture<Void> future = new CompletableFuture<>();
destination.newConnection(new Promise<>()
{
@Override
@ -186,7 +184,15 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation succeeded {}", pool.size(), pool.getMaxEntries(), connection);
adopt(entry, connection);
if (!(connection instanceof Attachable))
{
failed(new IllegalArgumentException("Invalid connection object: " + connection));
return;
}
((Attachable)connection).setAttachment(entry);
onCreated(connection);
entry.enable(connection, false);
idle(connection, false);
future.complete(null);
proceed();
}
@ -196,21 +202,30 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
{
if (LOG.isDebugEnabled())
LOG.debug("Connection " + pool.size() + "/" + pool.getMaxEntries() + " creation failed", x);
pool.remove(entry);
entry.remove();
future.completeExceptionally(x);
requester.failed(x);
}
});
return future;
}
@Override
public boolean accept(Connection connection)
{
if (!(connection instanceof Attachable))
throw new IllegalArgumentException("Invalid connection object: " + connection);
Pool<Connection>.Entry entry = pool.reserve(-1);
if (entry == null)
return false;
adopt(entry, connection);
if (LOG.isDebugEnabled())
LOG.debug("onCreating {} {}", entry, connection);
Attachable attachable = (Attachable)connection;
attachable.setAttachment(entry);
onCreated(connection);
entry.enable(connection, false);
idle(connection, false);
return true;
}
@ -219,19 +234,6 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
requester.succeeded();
}
private void adopt(Pool<Connection>.Entry entry, Connection connection)
{
if (!(connection instanceof Attachable))
throw new IllegalArgumentException("Invalid connection object: " + connection);
Attachable attachable = (Attachable)connection;
attachable.setAttachment(entry);
if (LOG.isDebugEnabled())
LOG.debug("onCreating {}", entry);
onCreated(connection);
entry.enable(connection);
idle(connection, false);
}
protected Connection activate()
{
Pool<Connection>.Entry entry = pool.acquire();

View File

@ -24,7 +24,7 @@ package org.eclipse.jetty.util;
public interface Attachable
{
/**
* @return the object attached to this stream
* @return the object attached to this instance
* @see #setAttachment(Object)
*/
Object getAttachment();
@ -32,7 +32,7 @@ public interface Attachable
/**
* Attaches the given object to this stream for later retrieval.
*
* @param attachment the object to attach to this stream
* @param attachment the object to attach to this instance
*/
void setAttachment(Object attachment);
}

View File

@ -26,6 +26,8 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.thread.AutoLock;
@ -33,7 +35,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A fast container of poolable objects, with optional support for
* A fast pool of objects, with optional support for
* multiplexing, max usage count and thread-local caching.
* <p>
* The thread-local caching mechanism is about remembering up to N previously
@ -43,6 +45,11 @@ import org.slf4j.LoggerFactory;
* This can greatly speed up acquisition when both the acquisition and the
* release of the entries is done on the same thread as this avoids iterating
* the global, thread-safe collection of entries.
* </p>
* <p>
* When the method {@link #close()} is called, all {@link Closeable}s in the pool
* are also closed.
* </p>
* @param <T>
*/
public class Pool<T> implements AutoCloseable, Dumpable
@ -62,6 +69,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
private final AutoLock lock = new AutoLock();
private final int maxEntries;
private final int cacheSize;
private final AtomicInteger pending = new AtomicInteger();
private volatile boolean closed;
private volatile int maxMultiplex = 1;
private volatile int maxUsageCount = -1;
@ -82,19 +90,19 @@ public class Pool<T> implements AutoCloseable, Dumpable
this.cache = null;
}
public int getPendingConnectionCount()
public int getReservedCount()
{
return (int)sharedList.stream().filter(entry -> entry.getPooled() == null).count();
return pending.get();
}
public int getIdleConnectionCount()
public int getIdleCount()
{
return (int)sharedList.stream().filter(Entry::isIdle).count();
}
public int getInUseConnectionCount()
public int getInUseCount()
{
return (int)sharedList.stream().filter(entry -> !entry.isIdle()).count();
return (int)sharedList.stream().filter(Entry::isInUse).count();
}
public int getMaxEntries()
@ -127,9 +135,10 @@ public class Pool<T> implements AutoCloseable, Dumpable
}
/**
* Create a new disabled slot into the pool. The returned entry
* won't be acquirable as long as {@link Entry#enable(Object)}
* has not been called.
* Create a new disabled slot into the pool.
* The returned entry must ultimately have the {@link Entry#enable(Object, boolean)}
* method called or be removed via {@link Pool.Entry#remove()} or
* {@link Pool#remove(Pool.Entry)}.
*
* @param maxReservations the max desired number of reserved entries,
* or a negative number to always trigger the reservation of a new entry.
@ -139,18 +148,26 @@ public class Pool<T> implements AutoCloseable, Dumpable
*/
public Entry reserve(int maxReservations)
{
if (maxReservations >= 0 && getPendingConnectionCount() >= maxReservations)
return null;
try (AutoLock l = lock.lock())
{
if (!closed && sharedList.size() < maxEntries)
{
Entry entry = new Entry();
sharedList.add(entry);
return entry;
}
return null;
if (closed)
return null;
int space = maxEntries - sharedList.size();
if (space <= 0)
return null;
// The pending count is an AtomicInteger that is only ever incremented here with
// the lock held. Thus the pending count can be reduced immediately after the
// test below, but never incremented. Thus the maxReservations limit can be
// enforced.
if (maxReservations >= 0 && pending.get() >= maxReservations)
return null;
pending.incrementAndGet();
Entry entry = new Entry();
sharedList.add(entry);
return entry;
}
}
@ -180,7 +197,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
/**
* Acquire an entry from the pool.
*
* Only enabled entries will be returned from this method and their enable method must not be called.
* @return an entry from the pool or null if none is available.
*/
public Entry acquire()
@ -209,6 +226,43 @@ public class Pool<T> implements AutoCloseable, Dumpable
return null;
}
/**
* Utility method to acquire an entry from the pool,
* reserving and creating a new entry if necessary.
*
* @param creator a function to create the pooled value for a reserved entry.
* @return an entry from the pool or null if none is available.
*/
public Entry acquire(Function<Pool<T>.Entry, T> creator)
{
Entry entry = acquire();
if (entry != null)
return entry;
entry = reserve(-1);
if (entry == null)
return null;
T value;
try
{
value = creator.apply(entry);
}
catch (Throwable th)
{
remove(entry);
throw th;
}
if (value == null)
{
remove(entry);
return null;
}
return entry.enable(value, true) ? entry : null;
}
/**
* This method will return an acquired object to the pool. Objects
* that are acquired from the pool but never released will result
@ -286,16 +340,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
for (Entry entry : copy)
{
if (entry.tryRemove() && entry.pooled instanceof Closeable)
{
try
{
((Closeable)entry.pooled).close();
}
catch (IOException e)
{
LOGGER.warn("Error closing entry {}", entry, e);
}
}
IO.close((Closeable)entry.pooled);
}
}
@ -323,13 +368,52 @@ public class Pool<T> implements AutoCloseable, Dumpable
public class Entry
{
// hi: positive=open/maxUsage counter,negative=closed lo: multiplexing counter
// hi: positive=open/maxUsage counter; negative=closed; MIN_VALUE pending
// lo: multiplexing counter
private final AtomicBiInteger state;
private volatile T pooled;
public Entry()
// The pooled item. This is not volatile as it is set once and then never changed.
// Other threads accessing must check the state field above first, so a good before/after
// relationship exists to make a memory barrier.
private T pooled;
Entry()
{
this.state = new AtomicBiInteger(-1, 0);
this.state = new AtomicBiInteger(Integer.MIN_VALUE, 0);
}
/** Enable a reserved entry {@link Entry}.
* An entry returned from the {@link #reserve(int)} method must be enabled with this method,
* once and only once, before it is usable by the pool.
* The entry may be enabled and not acquired, in which case it is immediately available to be
* acquired, potentially by another thread; or it can be enabled and acquired atomically so that
* no other thread can acquire it, although the acquire may still fail if the pool has been closed.
* @param pooled The pooled item for the entry
* @param acquire If true the entry is atomically enabled and acquired.
* @return true If the entry was enabled.
* @throws IllegalStateException if the entry was already enabled
*/
public boolean enable(T pooled, boolean acquire)
{
Objects.requireNonNull(pooled);
if (state.getHi() != Integer.MIN_VALUE)
{
if (state.getHi() == -1)
return false; // Pool has been closed
throw new IllegalStateException("Entry already enabled: " + this);
}
this.pooled = pooled;
int usage = acquire ? 1 : 0;
if (!state.compareAndSet(Integer.MIN_VALUE, usage, 0, usage))
{
this.pooled = null;
if (state.getHi() == -1)
return false; // Pool has been closed
throw new IllegalStateException("Entry already enabled: " + this);
}
pending.decrementAndGet();
return true;
}
public T getPooled()
@ -337,13 +421,24 @@ public class Pool<T> implements AutoCloseable, Dumpable
return pooled;
}
public void enable(T pooled)
/**
* Release the entry.
* This is equivalent to calling {@link Pool#release(Pool.Entry)} passing this entry.
* @return true if released.
*/
public boolean release()
{
if (!isClosed())
throw new IllegalStateException("Open entries cannot be enabled : " + this);
Objects.requireNonNull(pooled);
this.pooled = pooled;
state.set(0, 0);
return Pool.this.release(this);
}
/**
* Remove the entry.
* This is equivalent to calling {@link Pool#remove(Pool.Entry)} passing this entry.
* @return true if remove.
*/
public boolean remove()
{
return Pool.this.remove(this);
}
/**
@ -353,7 +448,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
* the multiplex count is maxMultiplex and the entry is not closed,
* false otherwise.
*/
public boolean tryAcquire()
boolean tryAcquire()
{
while (true)
{
@ -376,7 +471,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
* @return true if the entry was released,
* false if {@link #tryRemove()} should be called.
*/
public boolean tryRelease()
boolean tryRelease()
{
int newMultiplexingCount;
int usageCount;
@ -405,7 +500,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
* Try to mark the entry as removed.
* @return true if the entry has to be removed from the containing pool, false otherwise.
*/
public boolean tryRemove()
boolean tryRemove()
{
while (true)
{
@ -416,7 +511,11 @@ public class Pool<T> implements AutoCloseable, Dumpable
boolean removed = state.compareAndSet(usageCount, -1, multiplexCount, newMultiplexCount);
if (removed)
{
if (usageCount == Integer.MIN_VALUE)
pending.decrementAndGet();
return newMultiplexCount == 0;
}
}
}
@ -427,7 +526,14 @@ public class Pool<T> implements AutoCloseable, Dumpable
public boolean isIdle()
{
return state.getLo() <= 0;
long encoded = state.get();
return AtomicBiInteger.getHi(encoded) >= 0 && AtomicBiInteger.getLo(encoded) == 0;
}
public boolean isInUse()
{
long encoded = state.get();
return AtomicBiInteger.getHi(encoded) >= 0 && AtomicBiInteger.getLo(encoded) > 0;
}
public int getUsageCount()
@ -439,8 +545,12 @@ public class Pool<T> implements AutoCloseable, Dumpable
public String toString()
{
long encoded = state.get();
return super.toString() + " stateHi=" + AtomicBiInteger.getHi(encoded) +
" stateLo=" + AtomicBiInteger.getLo(encoded) + " pooled=" + pooled;
return String.format("%s@%x{hi=%d,lo=%d.p=%s}",
getClass().getSimpleName(),
hashCode(),
AtomicBiInteger.getHi(encoded),
AtomicBiInteger.getLo(encoded),
pooled);
}
}
}

View File

@ -19,10 +19,14 @@
package org.eclipse.jetty.util;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.MatcherAssert.assertThat;
@ -31,36 +35,77 @@ import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class PoolTest
{
@Test
public void testAcquireRelease()
public static Stream<Object[]> cacheSize()
{
Pool<String> pool = new Pool<>(1,0);
pool.reserve(-1).enable("aaa");
assertThat(pool.values().stream().findFirst().get().isIdle(), is(true));
Pool<String>.Entry e1 = pool.acquire();
assertThat(e1.getPooled(), equalTo("aaa"));
assertThat(pool.values().stream().findFirst().get().isIdle(), is(false));
assertThat(pool.acquire(), nullValue());
assertThat(pool.release(e1), is(true));
assertThat(pool.values().stream().findFirst().get().isIdle(), is(true));
assertThrows(IllegalStateException.class, () -> pool.release(e1));
Pool<String>.Entry e2 = pool.acquire();
assertThat(e2.getPooled(), equalTo("aaa"));
assertThat(pool.release(e2), is(true));
assertThrows(NullPointerException.class, () -> pool.release(null));
List<Object[]> data = new ArrayList<>();
data.add(new Object[]{0});
data.add(new Object[]{1});
data.add(new Object[]{2});
return data.stream();
}
@Test
public void testRemoveBeforeRelease()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testCache(int cacheSize)
{
Pool<String> pool = new Pool<>(1,0);
pool.reserve(-1).enable("aaa");
System.err.println(cacheSize);
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testAcquireRelease(int cacheSize)
{
Pool<String> pool = new Pool<>(1,cacheSize);
pool.reserve(-1).enable("aaa", false);
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));
Pool<String>.Entry e1 = pool.acquire();
assertThat(e1.getPooled(), equalTo("aaa"));
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getIdleCount(), is(0));
assertThat(pool.getInUseCount(), is(1));
assertNull(pool.acquire());
e1.release();
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));
assertThrows(IllegalStateException.class, e1::release);
Pool<String>.Entry e2 = pool.acquire();
assertThat(e2.getPooled(), equalTo("aaa"));
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getIdleCount(), is(0));
assertThat(pool.getInUseCount(), is(1));
pool.release(e2);
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));
assertThrows(IllegalStateException.class, () -> pool.release(e2));
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testRemoveBeforeRelease(int cacheSize)
{
Pool<String> pool = new Pool<>(1, cacheSize);
pool.reserve(-1).enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
assertThat(pool.remove(e1), is(true));
@ -68,11 +113,12 @@ public class PoolTest
assertThat(pool.release(e1), is(false));
}
@Test
public void testCloseBeforeRelease()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testCloseBeforeRelease(int cacheSize)
{
Pool<String> pool = new Pool<>(1,0);
pool.reserve(-1).enable("aaa");
Pool<String> pool = new Pool<>(1, cacheSize);
pool.reserve(-1).enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
assertThat(pool.size(), is(1));
@ -81,10 +127,11 @@ public class PoolTest
assertThat(pool.release(e1), is(false));
}
@Test
public void testMaxPoolSize()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testMaxPoolSize(int cacheSize)
{
Pool<String> pool = new Pool<>(1, 0);
Pool<String> pool = new Pool<>(1, cacheSize);
assertThat(pool.size(), is(0));
assertThat(pool.reserve(-1), notNullValue());
assertThat(pool.size(), is(1));
@ -92,42 +139,74 @@ public class PoolTest
assertThat(pool.size(), is(1));
}
@Test
public void testReserve()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testReserve(int cacheSize)
{
Pool<String> pool = new Pool<>(2, 0);
Pool<String>.Entry entry = pool.reserve(-1);
Pool<String> pool = new Pool<>(2, cacheSize);
// Reserve an entry
Pool<String>.Entry e1 = pool.reserve(-1);
assertThat(pool.size(), is(1));
assertThat(pool.acquire(), nullValue());
assertThat(entry.isClosed(), is(true));
assertThat(pool.getReservedCount(), is(1));
assertThat(pool.getIdleCount(), is(0));
assertThat(pool.getInUseCount(), is(0));
assertThrows(NullPointerException.class, () -> entry.enable(null));
assertThat(pool.acquire(), nullValue());
assertThat(entry.isClosed(), is(true));
// max reservations
assertNull(pool.reserve(1));
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(1));
assertThat(pool.getIdleCount(), is(0));
assertThat(pool.getInUseCount(), is(0));
entry.enable("aaa");
assertThat(entry.isClosed(), is(false));
assertThat(pool.acquire().getPooled(), notNullValue());
assertThrows(IllegalStateException.class, () -> entry.enable("bbb"));
// enable the entry
e1.enable("aaa", false);
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));
// Reserve another entry
Pool<String>.Entry e2 = pool.reserve(-1);
assertThat(pool.size(), is(2));
assertThat(pool.remove(e2), is(true));
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(1));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));
pool.reserve(-1);
// remove the reservation
e2.remove();
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));
// Reserve another entry
Pool<String>.Entry e3 = pool.reserve(-1);
assertThat(pool.size(), is(2));
pool.close();
assertThat(pool.size(), is(0));
assertThat(pool.reserve(-1), nullValue());
assertThat(entry.isClosed(), is(true));
assertThat(pool.getReservedCount(), is(1));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(0));
// enable and acquire the entry
e3.enable("bbb", true);
assertThat(pool.size(), is(2));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getIdleCount(), is(1));
assertThat(pool.getInUseCount(), is(1));
// can't reenable
assertThrows(IllegalStateException.class, () -> e3.enable("xxx", false));
// Can't enable acquired entry
assertThat(pool.acquire(), is(e1));
assertThrows(IllegalStateException.class, () -> e1.enable("xxx", false));
}
@Test
public void testReserveMaxPending()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testReserveMaxPending(int cacheSize)
{
Pool<String> pool = new Pool<>(2, 0);
Pool<String> pool = new Pool<>(2, cacheSize);
assertThat(pool.reserve(0), nullValue());
assertThat(pool.reserve(1), notNullValue());
assertThat(pool.reserve(1), nullValue());
@ -137,20 +216,22 @@ public class PoolTest
assertThat(pool.reserve(-1), nullValue());
}
@Test
public void testReserveNegativeMaxPending()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testReserveNegativeMaxPending(int cacheSize)
{
Pool<String> pool = new Pool<>(2, 0);
Pool<String> pool = new Pool<>(2, cacheSize);
assertThat(pool.reserve(-1), notNullValue());
assertThat(pool.reserve(-1), notNullValue());
assertThat(pool.reserve(-1), nullValue());
}
@Test
public void testClose()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testClose(int cacheSize)
{
Pool<String> pool = new Pool<>(1, 0);
pool.reserve(-1).enable("aaa");
Pool<String> pool = new Pool<>(1, cacheSize);
pool.reserve(-1).enable("aaa", false);
assertThat(pool.isClosed(), is(false));
pool.close();
pool.close();
@ -161,23 +242,25 @@ public class PoolTest
assertThat(pool.reserve(-1), nullValue());
}
@Test
public void testClosingCloseable()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testClosingCloseable(int cacheSize)
{
AtomicBoolean closed = new AtomicBoolean();
Pool<Closeable> pool = new Pool<>(1,0);
Closeable pooled = () -> closed.set(true);
pool.reserve(-1).enable(pooled);
pool.reserve(-1).enable(pooled, false);
assertThat(closed.get(), is(false));
pool.close();
assertThat(closed.get(), is(true));
}
@Test
public void testRemove()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testRemove(int cacheSize)
{
Pool<String> pool = new Pool<>(1, 0);
pool.reserve(-1).enable("aaa");
Pool<String> pool = new Pool<>(1, cacheSize);
pool.reserve(-1).enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
assertThat(pool.remove(e1), is(true));
@ -187,39 +270,42 @@ public class PoolTest
assertThrows(NullPointerException.class, () -> pool.remove(null));
}
@Test
public void testValuesSize()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testValuesSize(int cacheSize)
{
Pool<String> pool = new Pool<>(2, 0);
Pool<String> pool = new Pool<>(2, cacheSize);
assertThat(pool.size(), is(0));
assertThat(pool.values().isEmpty(), is(true));
pool.reserve(-1).enable("aaa");
pool.reserve(-1).enable("bbb");
pool.reserve(-1).enable("aaa", false);
pool.reserve(-1).enable("bbb", false);
assertThat(pool.values().stream().map(Pool.Entry::getPooled).collect(toList()), equalTo(Arrays.asList("aaa", "bbb")));
assertThat(pool.size(), is(2));
}
@Test
public void testValuesContainsAcquiredEntries()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testValuesContainsAcquiredEntries(int cacheSize)
{
Pool<String> pool = new Pool<>(2, 0);
Pool<String> pool = new Pool<>(2, cacheSize);
pool.reserve(-1).enable("aaa");
pool.reserve(-1).enable("bbb");
pool.reserve(-1).enable("aaa", false);
pool.reserve(-1).enable("bbb", false);
assertThat(pool.acquire(), notNullValue());
assertThat(pool.acquire(), notNullValue());
assertThat(pool.acquire(), nullValue());
assertThat(pool.values().isEmpty(), is(false));
}
@Test
public void testAcquireAt()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testAcquireAt(int cacheSize)
{
Pool<String> pool = new Pool<>(2, 0);
Pool<String> pool = new Pool<>(2, cacheSize);
pool.reserve(-1).enable("aaa");
pool.reserve(-1).enable("bbb");
pool.reserve(-1).enable("aaa", false);
pool.reserve(-1).enable("bbb", false);
assertThat(pool.acquireAt(2), nullValue());
assertThat(pool.acquireAt(0), notNullValue());
@ -228,12 +314,13 @@ public class PoolTest
assertThat(pool.acquireAt(1), nullValue());
}
@Test
public void testMaxUsageCount()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testMaxUsageCount(int cacheSize)
{
Pool<String> pool = new Pool<>(1, 0);
Pool<String> pool = new Pool<>(1, cacheSize);
pool.setMaxUsageCount(3);
pool.reserve(-1).enable("aaa");
pool.reserve(-1).enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
assertThat(pool.release(e1), is(true));
@ -250,13 +337,14 @@ public class PoolTest
assertThat(pool.release(e1Copy), is(false));
}
@Test
public void testMaxMultiplex()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testMaxMultiplex(int cacheSize)
{
Pool<String> pool = new Pool<>(2, 0);
Pool<String> pool = new Pool<>(2, cacheSize);
pool.setMaxMultiplex(3);
pool.reserve(-1).enable("aaa");
pool.reserve(-1).enable("bbb");
pool.reserve(-1).enable("aaa", false);
pool.reserve(-1).enable("bbb", false);
Pool<String>.Entry e1 = pool.acquire();
Pool<String>.Entry e2 = pool.acquire();
@ -273,12 +361,13 @@ public class PoolTest
assertThat(e4, sameInstance(e6));
}
@Test
public void testRemoveMultiplexed()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testRemoveMultiplexed(int cacheSize)
{
Pool<String> pool = new Pool<>(1, 0);
Pool<String> pool = new Pool<>(1, cacheSize);
pool.setMaxMultiplex(2);
pool.reserve(-1).enable("aaa");
pool.reserve(-1).enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
Pool<String>.Entry e2 = pool.acquire();
@ -297,12 +386,13 @@ public class PoolTest
assertThat(pool.remove(e1), is(false));
}
@Test
public void testMultiplexRemoveThenAcquireThenReleaseRemove()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testMultiplexRemoveThenAcquireThenReleaseRemove(int cacheSize)
{
Pool<String> pool = new Pool<>(1, 0);
Pool<String> pool = new Pool<>(1, cacheSize);
pool.setMaxMultiplex(2);
pool.reserve(-1).enable("aaa");
pool.reserve(-1).enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
Pool<String>.Entry e2 = pool.acquire();
@ -314,24 +404,26 @@ public class PoolTest
assertThat(pool.remove(e2), is(true));
}
@Test
public void testNonMultiplexRemoveAfterAcquire()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testNonMultiplexRemoveAfterAcquire(int cacheSize)
{
Pool<String> pool = new Pool<>(1, 0);
Pool<String> pool = new Pool<>(1, cacheSize);
pool.setMaxMultiplex(2);
pool.reserve(-1).enable("aaa");
pool.reserve(-1).enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
assertThat(pool.remove(e1), is(true));
assertThat(pool.size(), is(0));
}
@Test
public void testMultiplexRemoveAfterAcquire()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testMultiplexRemoveAfterAcquire(int cacheSize)
{
Pool<String> pool = new Pool<>(1, 0);
Pool<String> pool = new Pool<>(1, cacheSize);
pool.setMaxMultiplex(2);
pool.reserve(-1).enable("aaa");
pool.reserve(-1).enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
Pool<String>.Entry e2 = pool.acquire();
@ -350,10 +442,11 @@ public class PoolTest
assertThat(pool.size(), is(0));
}
@Test
public void testReleaseThenRemoveNonEnabledEntry()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testReleaseThenRemoveNonEnabledEntry(int cacheSize)
{
Pool<String> pool = new Pool<>(1, 0);
Pool<String> pool = new Pool<>(1, cacheSize);
Pool<String>.Entry e = pool.reserve(-1);
assertThat(pool.size(), is(1));
assertThat(pool.release(e), is(false));
@ -362,23 +455,25 @@ public class PoolTest
assertThat(pool.size(), is(0));
}
@Test
public void testRemoveNonEnabledEntry()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testRemoveNonEnabledEntry(int cacheSize)
{
Pool<String> pool = new Pool<>(1, 0);
Pool<String> pool = new Pool<>(1, cacheSize);
Pool<String>.Entry e = pool.reserve(-1);
assertThat(pool.size(), is(1));
assertThat(pool.remove(e), is(true));
assertThat(pool.size(), is(0));
}
@Test
public void testMultiplexMaxUsageReachedAcquireThenRemove()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testMultiplexMaxUsageReachedAcquireThenRemove(int cacheSize)
{
Pool<String> pool = new Pool<>(1, 0);
Pool<String> pool = new Pool<>(1, cacheSize);
pool.setMaxMultiplex(2);
pool.setMaxUsageCount(3);
pool.reserve(-1).enable("aaa");
pool.reserve(-1).enable("aaa", false);
Pool<String>.Entry e0 = pool.acquire();
@ -392,13 +487,14 @@ public class PoolTest
assertThat(pool.size(), is(0));
}
@Test
public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove(int cacheSize)
{
Pool<String> pool = new Pool<>(1, 0);
Pool<String> pool = new Pool<>(1, cacheSize);
pool.setMaxMultiplex(2);
pool.setMaxUsageCount(3);
pool.reserve(-1).enable("aaa");
pool.reserve(-1).enable("aaa", false);
Pool<String>.Entry e0 = pool.acquire();
@ -416,13 +512,14 @@ public class PoolTest
assertThat(pool.size(), is(0));
}
@Test
public void testUsageCountAfterReachingMaxMultiplexLimit()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testUsageCountAfterReachingMaxMultiplexLimit(int cacheSize)
{
Pool<String> pool = new Pool<>(1, 0);
Pool<String> pool = new Pool<>(1, cacheSize);
pool.setMaxMultiplex(2);
pool.setMaxUsageCount(10);
pool.reserve(-1).enable("aaa");
pool.reserve(-1).enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
assertThat(e1.getUsageCount(), is(1));
@ -432,11 +529,74 @@ public class PoolTest
assertThat(e1.getUsageCount(), is(2));
}
@Test
public void testConfigLimits()
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testConfigLimits(int cacheSize)
{
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(1, 0).setMaxMultiplex(0));
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(1, 0).setMaxMultiplex(-1));
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(1, 0).setMaxUsageCount(0));
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testAcquireWithCreator(int cacheSize)
{
Pool<String> pool = new Pool<>(2, cacheSize);
assertThat(pool.size(), is(0));
assertThat(pool.acquire(e -> null), nullValue());
assertThat(pool.size(), is(0));
Pool<String>.Entry e1 = pool.acquire(e -> "e1");
assertThat(e1.getPooled(), is("e1"));
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getInUseCount(), is(1));
assertThat(pool.acquire(e -> null), nullValue());
assertThat(pool.size(), is(1));
Pool<String>.Entry e2 = pool.acquire(e -> "e2");
assertThat(e2.getPooled(), is("e2"));
assertThat(pool.size(), is(2));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getInUseCount(), is(2));
Pool<String>.Entry e3 = pool.acquire(e -> "e3");
assertThat(e3, nullValue());
assertThat(pool.size(), is(2));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getInUseCount(), is(2));
assertThat(pool.acquire(e ->
{
throw new IllegalStateException();
}), nullValue());
e2.release();
assertThat(pool.size(), is(2));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getInUseCount(), is(1));
Pool<String>.Entry e4 = pool.acquire(e -> "e4");
assertThat(e4.getPooled(), is("e2"));
assertThat(pool.size(), is(2));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getInUseCount(), is(2));
pool.remove(e1);
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getInUseCount(), is(1));
assertThrows(IllegalStateException.class, () -> pool.acquire(e ->
{
throw new IllegalStateException();
}));
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getInUseCount(), is(1));
}
}