From 0dec883fdad0951b212545ce76deda748c0c4957 Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Fri, 11 Dec 2020 16:05:04 +0100 Subject: [PATCH] implement connection pool max duration --- .../jetty/client/AbstractConnectionPool.java | 134 +++++--- .../jetty/client/ConnectionPoolTest.java | 119 +++++++ .../http/MultiplexedConnectionPoolTest.java | 309 ++++++++++++++++++ .../java/org/eclipse/jetty/util/Pool.java | 10 +- .../java/org/eclipse/jetty/util/PoolTest.java | 270 +++++++-------- 5 files changed, 673 insertions(+), 169 deletions(-) create mode 100644 jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index 31d98ef3e18..386926fff0b 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -23,8 +23,10 @@ import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.Objects; import java.util.Queue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Collectors; @@ -32,6 +34,7 @@ import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.util.Attachable; import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.Pool; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.annotation.ManagedAttribute; @@ -54,6 +57,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen private final Callback requester; private final Pool pool; private boolean maximizeConnections; + private volatile long maxDurationNanos = 0L; /** * @deprecated use {@link #AbstractConnectionPool(HttpDestination, int, boolean, Callback)} instead @@ -105,6 +109,27 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])); } + /** + *

Get the max usage duration in milliseconds of the pool's connections. + * Values {@code 0} and negative mean that there is no limit.

+ *

This only guarantees that a connection cannot be acquired after the configured + * duration elapses, so that is only enforced when {@link #acquire()} is called. + * If a pool stays completely idle for a duration longer than the value + * returned by this method, the max duration will not be enforced. + * It's up to the idle timeout mechanism (see {@link HttpClient#getIdleTimeout()}) + * to handle closing idle connections.

+ */ + @ManagedAttribute(value = "The maximum duration in milliseconds a connection can be used for before it gets closed") + public long getMaxDuration() + { + return TimeUnit.NANOSECONDS.toMillis(maxDurationNanos); + } + + public void setMaxDuration(long maxDurationInMs) + { + this.maxDurationNanos = TimeUnit.MILLISECONDS.toNanos(maxDurationInMs); + } + protected int getMaxMultiplex() { return pool.getMaxMultiplex(); @@ -290,16 +315,35 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen protected Connection activate() { - Pool.Entry entry = pool.acquire(); - if (entry != null) + while (true) { - if (LOG.isDebugEnabled()) - LOG.debug("Activated {} {}", entry, pool); - Connection connection = entry.getPooled(); - acquired(connection); - return connection; + Pool.Entry entry = pool.acquire(); + if (entry != null) + { + Connection connection = entry.getPooled(); + + long maxDurationNanos = this.maxDurationNanos; + if (maxDurationNanos > 0L) + { + EntryHolder holder = (EntryHolder)((Attachable)connection).getAttachment(); + if (holder.isExpired(maxDurationNanos)) + { + boolean canClose = remove(connection, true); + if (canClose) + IO.close(connection); + if (LOG.isDebugEnabled()) + LOG.debug("Connection removed{} due to expiration {} {}", (canClose ? " and closed" : ""), entry, pool); + continue; + } + } + + if (LOG.isDebugEnabled()) + LOG.debug("Activated {} {}", entry, pool); + acquired(connection); + return connection; + } + return null; } - return null; } @Override @@ -308,11 +352,10 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen if (!(connection instanceof Attachable)) throw new IllegalArgumentException("Invalid connection object: " + connection); Attachable attachable = (Attachable)connection; - @SuppressWarnings("unchecked") - Pool.Entry entry = (Pool.Entry)attachable.getAttachment(); - if (entry == null) + EntryHolder holder = (EntryHolder)attachable.getAttachment(); + if (holder == null) return false; - return !entry.isIdle(); + return !holder.entry.isIdle(); } @Override @@ -329,13 +372,12 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen if (!(connection instanceof Attachable)) throw new IllegalArgumentException("Invalid connection object: " + connection); Attachable attachable = (Attachable)connection; - @SuppressWarnings("unchecked") - Pool.Entry entry = (Pool.Entry)attachable.getAttachment(); - if (entry == null) + EntryHolder holder = (EntryHolder)attachable.getAttachment(); + if (holder == null) return true; - boolean reusable = pool.release(entry); + boolean reusable = pool.release(holder.entry); if (LOG.isDebugEnabled()) - LOG.debug("Released ({}) {} {}", reusable, entry, pool); + LOG.debug("Released ({}) {} {}", reusable, holder.entry, pool); if (reusable) return true; remove(connection); @@ -353,14 +395,14 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen if (!(connection instanceof Attachable)) throw new IllegalArgumentException("Invalid connection object: " + connection); Attachable attachable = (Attachable)connection; - @SuppressWarnings("unchecked") - Pool.Entry entry = (Pool.Entry)attachable.getAttachment(); - if (entry == null) + EntryHolder holder = (EntryHolder)attachable.getAttachment(); + if (holder == null) return false; - attachable.setAttachment(null); - boolean removed = pool.remove(entry); + boolean removed = pool.remove(holder.entry); + if (removed) + attachable.setAttachment(null); if (LOG.isDebugEnabled()) - LOG.debug("Removed ({}) {} {}", removed, entry, pool); + LOG.debug("Removed ({}) {} {}", removed, holder.entry, pool); if (removed || force) { released(connection); @@ -433,20 +475,22 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen @Override public boolean sweep() { - pool.values().stream().filter(entry -> entry.getPooled() instanceof Sweeper.Sweepable).forEach(entry -> - { - Connection connection = entry.getPooled(); - if (((Sweeper.Sweepable)connection).sweep()) + pool.values().stream() + .map(Pool.Entry::getPooled) + .filter(connection -> connection instanceof Sweeper.Sweepable) + .forEach(connection -> { - boolean removed = remove(connection); - LOG.warn("Connection swept: {}{}{} from active connections{}{}", - connection, - System.lineSeparator(), - removed ? "Removed" : "Not removed", - System.lineSeparator(), - dump()); - } - }); + if (((Sweeper.Sweepable)connection).sweep()) + { + boolean removed = remove(connection); + LOG.warn("Connection swept: {}{}{} from active connections{}{}", + connection, + System.lineSeparator(), + removed ? "Removed" : "Not removed", + System.lineSeparator(), + dump()); + } + }); return false; } @@ -480,7 +524,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen LOG.debug("Connection creation succeeded {}: {}", reserved, connection); if (connection instanceof Attachable) { - ((Attachable)connection).setAttachment(reserved); + ((Attachable)connection).setAttachment(new EntryHolder(reserved)); onCreated(connection); pending.decrementAndGet(); reserved.enable(connection, false); @@ -507,4 +551,20 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen requester.failed(x); } } + + private static class EntryHolder + { + private final Pool.Entry entry; + private final long creationTimestamp = System.nanoTime(); + + private EntryHolder(Pool.Entry entry) + { + this.entry = Objects.requireNonNull(entry); + } + + private boolean isExpired(long timeoutNanos) + { + return System.nanoTime() - creationTimestamp >= timeoutNanos; + } + } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java index e6afe43e15c..2313a8e3244 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java @@ -26,12 +26,14 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.IntStream; import java.util.stream.Stream; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.client.api.Request; @@ -50,6 +52,7 @@ import org.eclipse.jetty.util.SocketAddressResolver; import org.eclipse.jetty.util.thread.QueuedThreadPool; import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.MethodSource; @@ -73,6 +76,12 @@ public class ConnectionPoolTest { return Stream.of( new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)), + new ConnectionPoolFactory("duplex-maxDuration", destination -> + { + DuplexConnectionPool pool = new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination); + pool.setMaxDuration(10); + return pool; + }), new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)), new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)) ); @@ -432,6 +441,116 @@ public class ConnectionPoolTest assertThat(connectionPool.getConnectionCount(), Matchers.greaterThanOrEqualTo(count)); } + @Test + public void testMaxDurationConnectionsWithConstrainedPool() throws Exception + { + // ConnectionPool may NOT open more connections than expected because + // it is constrained to a single connection in this test. + + final int maxConnections = 1; + final int maxDuration = 30; + AtomicInteger poolCreateCounter = new AtomicInteger(); + AtomicInteger poolRemoveCounter = new AtomicInteger(); + ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination -> + { + // Constrain the max pool size to 1. + DuplexConnectionPool pool = new DuplexConnectionPool(destination, maxConnections, destination) + { + @Override + protected void onCreated(Connection connection) + { + poolCreateCounter.incrementAndGet(); + } + + @Override + protected void removed(Connection connection) + { + poolRemoveCounter.incrementAndGet(); + } + }; + pool.setMaxDuration(maxDuration); + return pool; + }); + + startServer(new EmptyServerHandler()); + + HttpClientTransport transport = new HttpClientTransportOverHTTP(1); + transport.setConnectionPoolFactory(factory.factory); + client = new HttpClient(transport, null); + client.start(); + + // Use the connection pool 5 times with a delay that is longer than the max duration in between each time. + for (int i = 0; i < 5; i++) + { + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .timeout(5, TimeUnit.SECONDS) + .send(); + assertThat(response.getStatus(), Matchers.is(200)); + + Thread.sleep(maxDuration * 2); + } + + // Check that the pool created 5 and removed 4 connections; + // it must be exactly 4 removed b/c each cycle of the loop + // can only open 1 connection as the pool is constrained to + // maximum 1 connection. + assertThat(poolCreateCounter.get(), Matchers.is(5)); + assertThat(poolRemoveCounter.get(), Matchers.is(4)); + } + + @Test + public void testMaxDurationConnectionsWithUnconstrainedPool() throws Exception + { + // ConnectionPools may open a few more connections than expected. + + final int maxDuration = 30; + AtomicInteger poolCreateCounter = new AtomicInteger(); + AtomicInteger poolRemoveCounter = new AtomicInteger(); + ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination -> + { + DuplexConnectionPool pool = new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination) + { + @Override + protected void onCreated(Connection connection) + { + poolCreateCounter.incrementAndGet(); + } + + @Override + protected void removed(Connection connection) + { + poolRemoveCounter.incrementAndGet(); + } + }; + pool.setMaxDuration(maxDuration); + return pool; + }); + + startServer(new EmptyServerHandler()); + + HttpClientTransport transport = new HttpClientTransportOverHTTP(1); + transport.setConnectionPoolFactory(factory.factory); + client = new HttpClient(transport, null); + client.start(); + + // Use the connection pool 5 times with a delay that is longer than the max duration in between each time. + for (int i = 0; i < 5; i++) + { + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .timeout(5, TimeUnit.SECONDS) + .send(); + assertThat(response.getStatus(), Matchers.is(200)); + + Thread.sleep(maxDuration * 2); + } + + // Check that the pool created 5 and removed at least 4 connections; + // it can be more than 4 removed b/c each cycle of the loop may + // open more than 1 connection as the pool is not constrained. + assertThat(poolCreateCounter.get(), Matchers.is(5)); + assertThat(poolRemoveCounter.get(), Matchers.greaterThanOrEqualTo(4)); + } + @ParameterizedTest @MethodSource("pools") public void testConnectionMaxUsage(ConnectionPoolFactory factory) throws Exception diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java new file mode 100644 index 00000000000..f08069bb106 --- /dev/null +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java @@ -0,0 +1,309 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client.http; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.ConnectionPool; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpClientTransport; +import org.eclipse.jetty.client.MultiplexConnectionPool; +import org.eclipse.jetty.client.api.Connection; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.http2.client.HTTP2Client; +import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.util.Pool; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertTrue; + +// Sibling of ConnectionPoolTest, but using H2 to multiplex connections. +public class MultiplexedConnectionPoolTest +{ + private static final int MAX_MULTIPLEX = 2; + + private Server server; + private ServerConnector connector; + private HttpClient client; + + private void startServer(Handler handler) throws Exception + { + server = new Server(); + HTTP2ServerConnectionFactory http2ServerConnectionFactory = new HTTP2ServerConnectionFactory(new HttpConfiguration()); + http2ServerConnectionFactory.setMaxConcurrentStreams(MAX_MULTIPLEX); + connector = new ServerConnector(server, 1, 1, http2ServerConnectionFactory); + server.addConnector(connector); + server.setHandler(handler); + server.start(); + } + + @AfterEach + public void disposeServer() throws Exception + { + connector = null; + if (server != null) + { + server.stop(); + server = null; + } + } + + @AfterEach + public void disposeClient() throws Exception + { + if (client != null) + { + client.stop(); + client = null; + } + } + + @Test + public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception + { + final int maxDuration = 30; + AtomicInteger poolCreateCounter = new AtomicInteger(); + AtomicInteger poolRemoveCounter = new AtomicInteger(); + AtomicReference> poolRef = new AtomicReference<>(); + ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination -> + { + int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination(); + Pool pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false); + poolRef.set(pool); + MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX) + { + @Override + protected void onCreated(Connection connection) + { + poolCreateCounter.incrementAndGet(); + } + + @Override + protected void removed(Connection connection) + { + poolRemoveCounter.incrementAndGet(); + } + }; + connectionPool.setMaxDuration(maxDuration); + return connectionPool; + }); + + startServer(new EmptyServerHandler()); + + HttpClientTransport transport = new HttpClientTransportOverHTTP2(new HTTP2Client()); + transport.setConnectionPoolFactory(factory.factory); + client = new HttpClient(transport, null); + client.start(); + + // Use the connection pool 5 times with a delay that is longer than the max duration in between each time. + for (int i = 0; i < 5; i++) + { + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .timeout(5, TimeUnit.SECONDS) + .send(); + assertThat(response.getStatus(), Matchers.is(200)); + + // Check that the pool never grows above 1. + assertThat(poolRef.get().size(), is(1)); + + Thread.sleep(maxDuration * 2); + } + + // Check that the pool created 5 and removed 4 connections; + // it must be exactly 4 removed b/c while the pool is not + // constrained, it can multiplex requests on a single connection + // so that should prevent opening more connections than needed. + assertThat(poolCreateCounter.get(), is(5)); + assertThat(poolRemoveCounter.get(), is(4)); + } + + @Test + public void testMaxDurationConnectionsWithMultiplexedPoolClosesExpiredConnectionWhileStillInUse() throws Exception + { + final int maxDuration = 1000; + final int maxIdle = 2000; + + AtomicInteger poolCreateCounter = new AtomicInteger(); + AtomicInteger poolRemoveCounter = new AtomicInteger(); + AtomicReference> poolRef = new AtomicReference<>(); + ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination -> + { + int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination(); + Pool pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false); + poolRef.set(pool); + MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX) + { + @Override + protected void onCreated(Connection connection) + { + poolCreateCounter.incrementAndGet(); + } + + @Override + protected void removed(Connection connection) + { + poolRemoveCounter.incrementAndGet(); + } + }; + connectionPool.setMaxDuration(maxDuration); + return connectionPool; + }); + + Semaphore handlerSignalingSemaphore = new Semaphore(0); + Semaphore handlerWaitingSemaphore = new Semaphore(0); + startServer(new EmptyServerHandler() + { + @Override + protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException + { + if (!target.equals("/block")) + return; + + handlerSignalingSemaphore.release(); + + try + { + handlerWaitingSemaphore.acquire(); + } + catch (Exception e) + { + throw new ServletException(e); + } + } + }); + + HttpClientTransport transport = new HttpClientTransportOverHTTP2(new HTTP2Client()); + transport.setConnectionPoolFactory(factory.factory); + client = new HttpClient(transport, null); + client.setIdleTimeout(maxIdle); + client.start(); + + CountDownLatch latch1 = new CountDownLatch(1); + CountDownLatch latch2 = new CountDownLatch(2); + // create 2 requests that are going to consume all the multiplexing slots + client.newRequest("localhost", connector.getLocalPort()) + .path("/block") + .timeout(5, TimeUnit.SECONDS) + .send(result -> + { + if (result.isSucceeded()) + { + latch1.countDown(); + latch2.countDown(); + } + }); + + // wait for the 1st request to be serviced to make sure only 1 connection gets created + handlerSignalingSemaphore.acquire(); + + client.newRequest("localhost", connector.getLocalPort()) + .path("/block") + .timeout(5, TimeUnit.SECONDS) + .send(result -> + { + if (result.isSucceeded()) + { + latch1.countDown(); + latch2.countDown(); + } + }); + + // wait for both requests to start being serviced + handlerSignalingSemaphore.acquire(); + + assertThat(poolCreateCounter.get(), is(1)); + + // finalize 1 request, freeing up 1 multiplexing slot + handlerWaitingSemaphore.release(); + // wait until 1st request finished + assertTrue(latch1.await(5, TimeUnit.SECONDS)); + + assertThat(poolRef.get().getInUseCount(), is(1)); + assertThat(poolRef.get().getIdleCount(), is(0)); + assertThat(poolRef.get().getClosedCount(), is(0)); + assertThat(poolRef.get().size(), is(1)); + + // wait for the connection to expire + Thread.sleep(maxDuration + 500); + + // send a 3rd request that will close the expired multiplexed connection + ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) + .path("/do-not-block") + .timeout(5, TimeUnit.SECONDS) + .send(); + assertThat(response.getStatus(), is(200)); + + assertThat(poolRef.get().getInUseCount(), is(0)); + assertThat(poolRef.get().getIdleCount(), is(1)); + assertThat(poolRef.get().getClosedCount(), is(1)); + assertThat(poolRef.get().size(), is(2)); + + // unblock 2nd request + handlerWaitingSemaphore.release(); + //wait until 2nd request finished + assertTrue(latch2.await(5, TimeUnit.SECONDS)); + + assertThat(poolRef.get().getInUseCount(), is(0)); + assertThat(poolRef.get().getIdleCount(), is(1)); + assertThat(poolRef.get().getClosedCount(), is(0)); + assertThat(poolRef.get().size(), is(1)); + assertThat(poolCreateCounter.get(), is(2)); + + // wait for idle connections to be closed + Thread.sleep(maxIdle + 500); + + assertThat(poolRef.get().getIdleCount(), is(0)); + assertThat(poolRef.get().size(), is(0)); + assertThat(poolRemoveCounter.get(), is(3)); + } + + private static class ConnectionPoolFactory + { + private final String name; + private final ConnectionPool.Factory factory; + + private ConnectionPoolFactory(String name, ConnectionPool.Factory factory) + { + this.name = name; + this.factory = factory; + } + + @Override + public String toString() + { + return name; + } + } +} diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java index dd705fb2fd0..d6b49377730 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Pool.java @@ -149,6 +149,11 @@ public class Pool implements AutoCloseable, Dumpable return (int)entries.stream().filter(Entry::isInUse).count(); } + public int getClosedCount() + { + return (int)entries.stream().filter(Entry::isClosed).count(); + } + public int getMaxEntries() { return maxEntries; @@ -627,8 +632,9 @@ public class Pool implements AutoCloseable, Dumpable } /** - * Try to mark the entry as removed. - * @return true if the entry has to be removed from the containing pool, false otherwise. + * Try to remove the entry by marking it as closed and decrementing the multiplexing counter. + * The multiplexing counter will never go below zero and if it reaches zero, the entry is considered removed. + * @return true if the entry can be removed from the containing pool, false otherwise. */ boolean tryRemove() { diff --git a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java index 52abccdde88..719ae61db15 100644 --- a/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java +++ b/jetty-util/src/test/java/org/eclipse/jetty/util/PoolTest.java @@ -24,7 +24,6 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.stream.Stream; @@ -52,7 +51,24 @@ public class PoolTest { interface Factory { - Pool getPool(int maxSize); + Pool getPool(int maxSize); + } + + private static class CloseableHolder implements Closeable + { + private boolean closed; + private final String value; + + public CloseableHolder(String value) + { + this.value = value; + } + + @Override + public void close() + { + closed = true; + } } public static Stream strategy() @@ -69,15 +85,15 @@ public class PoolTest @MethodSource(value = "strategy") public void testAcquireRelease(Factory factory) { - Pool pool = factory.getPool(1); - pool.reserve().enable("aaa", false); + Pool pool = factory.getPool(1); + pool.reserve(-1).enable(new CloseableHolder("aaa"), false); assertThat(pool.size(), is(1)); assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getIdleCount(), is(1)); assertThat(pool.getInUseCount(), is(0)); - Pool.Entry e1 = pool.acquire(); - assertThat(e1.getPooled(), equalTo("aaa")); + Pool.Entry e1 = pool.acquire(); + assertThat(e1.getPooled().value, equalTo("aaa")); assertThat(pool.size(), is(1)); assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getIdleCount(), is(0)); @@ -93,8 +109,8 @@ public class PoolTest assertThrows(IllegalStateException.class, e1::release); - Pool.Entry e2 = pool.acquire(); - assertThat(e2.getPooled(), equalTo("aaa")); + Pool.Entry e2 = pool.acquire(); + assertThat(e2.getPooled().value, equalTo("aaa")); assertThat(pool.size(), is(1)); assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getIdleCount(), is(0)); @@ -113,10 +129,10 @@ public class PoolTest @MethodSource(value = "strategy") public void testRemoveBeforeRelease(Factory factory) { - Pool pool = factory.getPool(1); - pool.reserve().enable("aaa", false); + Pool pool = factory.getPool(1); + pool.reserve(-1).enable(new CloseableHolder("aaa"), false); - Pool.Entry e1 = pool.acquire(); + Pool.Entry e1 = pool.acquire(); assertThat(pool.remove(e1), is(true)); assertThat(pool.remove(e1), is(false)); assertThat(pool.release(e1), is(false)); @@ -126,21 +142,22 @@ public class PoolTest @MethodSource(value = "strategy") public void testCloseBeforeRelease(Factory factory) { - Pool pool = factory.getPool(1); - pool.reserve().enable("aaa", false); + Pool pool = factory.getPool(1); + pool.reserve().enable(new CloseableHolder("aaa"), false); - Pool.Entry e1 = pool.acquire(); + Pool.Entry e1 = pool.acquire(); assertThat(pool.size(), is(1)); pool.close(); assertThat(pool.size(), is(0)); assertThat(pool.release(e1), is(false)); + assertThat(e1.getPooled().closed, is(true)); } @ParameterizedTest @MethodSource(value = "strategy") public void testMaxPoolSize(Factory factory) { - Pool pool = factory.getPool(1); + Pool pool = factory.getPool(1); assertThat(pool.size(), is(0)); assertThat(pool.reserve(), notNullValue()); assertThat(pool.size(), is(1)); @@ -152,25 +169,25 @@ public class PoolTest @MethodSource(value = "strategy") public void testReserve(Factory factory) { - Pool pool = factory.getPool(2); + Pool pool = factory.getPool(2); pool.setMaxMultiplex(2); // Reserve an entry - Pool.Entry e1 = pool.reserve(); + Pool.Entry e1 = pool.reserve(); assertThat(pool.size(), is(1)); assertThat(pool.getReservedCount(), is(1)); assertThat(pool.getIdleCount(), is(0)); assertThat(pool.getInUseCount(), is(0)); // enable the entry - e1.enable("aaa", false); + e1.enable(new CloseableHolder("aaa"), false); assertThat(pool.size(), is(1)); assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getIdleCount(), is(1)); assertThat(pool.getInUseCount(), is(0)); // Reserve another entry - Pool.Entry e2 = pool.reserve(); + Pool.Entry e2 = pool.reserve(); assertThat(pool.size(), is(2)); assertThat(pool.getReservedCount(), is(1)); assertThat(pool.getIdleCount(), is(1)); @@ -184,35 +201,35 @@ public class PoolTest assertThat(pool.getInUseCount(), is(0)); // Reserve another entry - Pool.Entry e3 = pool.reserve(); + Pool.Entry e3 = pool.reserve(); assertThat(pool.size(), is(2)); assertThat(pool.getReservedCount(), is(1)); assertThat(pool.getIdleCount(), is(1)); assertThat(pool.getInUseCount(), is(0)); // enable and acquire the entry - e3.enable("bbb", true); + e3.enable(new CloseableHolder("bbb"), true); assertThat(pool.size(), is(2)); assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getIdleCount(), is(1)); assertThat(pool.getInUseCount(), is(1)); // can't reenable - assertThrows(IllegalStateException.class, () -> e3.enable("xxx", false)); + assertThrows(IllegalStateException.class, () -> e3.enable(new CloseableHolder("xxx"), false)); // Can't enable acquired entry - Pool.Entry e = pool.acquire(); - assertThrows(IllegalStateException.class, () -> e.enable("xxx", false)); + Pool.Entry e = pool.acquire(); + assertThrows(IllegalStateException.class, () -> e.enable(new CloseableHolder("xxx"), false)); } @ParameterizedTest @MethodSource(value = "strategy") public void testDeprecatedReserve(Factory factory) { - Pool pool = factory.getPool(2); + Pool pool = factory.getPool(2); // Reserve an entry - Pool.Entry e1 = pool.reserve(-1); + Pool.Entry e1 = pool.reserve(-1); assertThat(pool.size(), is(1)); assertThat(pool.getReservedCount(), is(1)); assertThat(pool.getIdleCount(), is(0)); @@ -226,14 +243,14 @@ public class PoolTest assertThat(pool.getInUseCount(), is(0)); // enable the entry - e1.enable("aaa", false); + e1.enable(new CloseableHolder("aaa"), false); assertThat(pool.size(), is(1)); assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getIdleCount(), is(1)); assertThat(pool.getInUseCount(), is(0)); // Reserve another entry - Pool.Entry e2 = pool.reserve(-1); + Pool.Entry e2 = pool.reserve(-1); assertThat(pool.size(), is(2)); assertThat(pool.getReservedCount(), is(1)); assertThat(pool.getIdleCount(), is(1)); @@ -247,32 +264,32 @@ public class PoolTest assertThat(pool.getInUseCount(), is(0)); // Reserve another entry - Pool.Entry e3 = pool.reserve(-1); + Pool.Entry e3 = pool.reserve(-1); assertThat(pool.size(), is(2)); assertThat(pool.getReservedCount(), is(1)); assertThat(pool.getIdleCount(), is(1)); assertThat(pool.getInUseCount(), is(0)); // enable and acquire the entry - e3.enable("bbb", true); + e3.enable(new CloseableHolder("bbb"), true); assertThat(pool.size(), is(2)); assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getIdleCount(), is(1)); assertThat(pool.getInUseCount(), is(1)); // can't reenable - assertThrows(IllegalStateException.class, () -> e3.enable("xxx", false)); + assertThrows(IllegalStateException.class, () -> e3.enable(new CloseableHolder("xxx"), false)); // Can't enable acquired entry - Pool.Entry e = pool.acquire(); - assertThrows(IllegalStateException.class, () -> e.enable("xxx", false)); + Pool.Entry e = pool.acquire(); + assertThrows(IllegalStateException.class, () -> e.enable(new CloseableHolder("xxx"), false)); } @ParameterizedTest @MethodSource(value = "strategy") public void testReserveNegativeMaxPending(Factory factory) { - Pool pool = factory.getPool(2); + Pool pool = factory.getPool(2); assertThat(pool.reserve(), notNullValue()); assertThat(pool.reserve(), notNullValue()); assertThat(pool.reserve(), nullValue()); @@ -282,8 +299,9 @@ public class PoolTest @MethodSource(value = "strategy") public void testClose(Factory factory) { - Pool pool = factory.getPool(1); - pool.reserve().enable("aaa", false); + Pool pool = factory.getPool(1); + CloseableHolder holder = new CloseableHolder("aaa"); + pool.reserve().enable(holder, false); assertThat(pool.isClosed(), is(false)); pool.close(); pool.close(); @@ -292,28 +310,17 @@ public class PoolTest assertThat(pool.size(), is(0)); assertThat(pool.acquire(), nullValue()); assertThat(pool.reserve(), nullValue()); - } - - @Test - public void testClosingCloseable() - { - AtomicBoolean closed = new AtomicBoolean(); - Pool pool = new Pool<>(FIRST, 1); - Closeable pooled = () -> closed.set(true); - pool.reserve().enable(pooled, false); - assertThat(closed.get(), is(false)); - pool.close(); - assertThat(closed.get(), is(true)); + assertThat(holder.closed, is(true)); } @ParameterizedTest @MethodSource(value = "strategy") public void testRemove(Factory factory) { - Pool pool = factory.getPool(1); - pool.reserve().enable("aaa", false); + Pool pool = factory.getPool(1); + pool.reserve().enable(new CloseableHolder("aaa"), false); - Pool.Entry e1 = pool.acquire(); + Pool.Entry e1 = pool.acquire(); assertThat(pool.remove(e1), is(true)); assertThat(pool.remove(e1), is(false)); assertThat(pool.release(e1), is(false)); @@ -325,13 +332,13 @@ public class PoolTest @MethodSource(value = "strategy") public void testValuesSize(Factory factory) { - Pool pool = factory.getPool(2); + Pool pool = factory.getPool(2); assertThat(pool.size(), is(0)); assertThat(pool.values().isEmpty(), is(true)); - pool.reserve().enable("aaa", false); - pool.reserve().enable("bbb", false); - assertThat(pool.values().stream().map(Pool.Entry::getPooled).collect(toList()), equalTo(Arrays.asList("aaa", "bbb"))); + pool.reserve().enable(new CloseableHolder("aaa"), false); + pool.reserve().enable(new CloseableHolder("bbb"), false); + assertThat(pool.values().stream().map(Pool.Entry::getPooled).map(closeableHolder -> closeableHolder.value).collect(toList()), equalTo(Arrays.asList("aaa", "bbb"))); assertThat(pool.size(), is(2)); } @@ -339,10 +346,10 @@ public class PoolTest @MethodSource(value = "strategy") public void testValuesContainsAcquiredEntries(Factory factory) { - Pool pool = factory.getPool(2); + Pool pool = factory.getPool(2); - pool.reserve().enable("aaa", false); - pool.reserve().enable("bbb", false); + pool.reserve().enable(new CloseableHolder("aaa"), false); + pool.reserve().enable(new CloseableHolder("bbb"), false); assertThat(pool.acquire(), notNullValue()); assertThat(pool.acquire(), notNullValue()); assertThat(pool.acquire(), nullValue()); @@ -353,10 +360,10 @@ public class PoolTest @MethodSource(value = "strategy") public void testAcquireAt(Factory factory) { - Pool pool = factory.getPool(2); + Pool pool = factory.getPool(2); - pool.reserve(-1).enable("aaa", false); - pool.reserve(-1).enable("bbb", false); + pool.reserve(-1).enable(new CloseableHolder("aaa"), false); + pool.reserve(-1).enable(new CloseableHolder("bbb"), false); assertThat(pool.acquireAt(2), nullValue()); assertThat(pool.acquireAt(0), notNullValue()); @@ -369,11 +376,11 @@ public class PoolTest @MethodSource(value = "strategy") public void testMaxUsageCount(Factory factory) { - Pool pool = factory.getPool(1); + Pool pool = factory.getPool(1); pool.setMaxUsageCount(3); - pool.reserve().enable("aaa", false); + pool.reserve().enable(new CloseableHolder("aaa"), false); - Pool.Entry e1 = pool.acquire(); + Pool.Entry e1 = pool.acquire(); assertThat(pool.release(e1), is(true)); e1 = pool.acquire(); assertThat(pool.release(e1), is(true)); @@ -384,7 +391,7 @@ public class PoolTest assertThat(pool.remove(e1), is(true)); assertThat(pool.remove(e1), is(false)); assertThat(pool.size(), is(0)); - Pool.Entry e1Copy = e1; + Pool.Entry e1Copy = e1; assertThat(pool.release(e1Copy), is(false)); } @@ -392,7 +399,7 @@ public class PoolTest @MethodSource(value = "strategy") public void testMaxMultiplex(Factory factory) { - Pool pool = factory.getPool(2); + Pool pool = factory.getPool(2); pool.setMaxMultiplex(3); Map counts = new HashMap<>(); @@ -400,21 +407,21 @@ public class PoolTest AtomicInteger b = new AtomicInteger(); counts.put("a", a); counts.put("b", b); - pool.reserve().enable("a", false); - pool.reserve().enable("b", false); + pool.reserve().enable(new CloseableHolder("a"), false); + pool.reserve().enable(new CloseableHolder("b"), false); - counts.get(pool.acquire().getPooled()).incrementAndGet(); - counts.get(pool.acquire().getPooled()).incrementAndGet(); - counts.get(pool.acquire().getPooled()).incrementAndGet(); - counts.get(pool.acquire().getPooled()).incrementAndGet(); + counts.get(pool.acquire().getPooled().value).incrementAndGet(); + counts.get(pool.acquire().getPooled().value).incrementAndGet(); + counts.get(pool.acquire().getPooled().value).incrementAndGet(); + counts.get(pool.acquire().getPooled().value).incrementAndGet(); assertThat(a.get(), greaterThan(0)); assertThat(a.get(), lessThanOrEqualTo(3)); assertThat(b.get(), greaterThan(0)); assertThat(b.get(), lessThanOrEqualTo(3)); - counts.get(pool.acquire().getPooled()).incrementAndGet(); - counts.get(pool.acquire().getPooled()).incrementAndGet(); + counts.get(pool.acquire().getPooled().value).incrementAndGet(); + counts.get(pool.acquire().getPooled().value).incrementAndGet(); assertThat(a.get(), is(3)); assertThat(b.get(), is(3)); @@ -426,13 +433,13 @@ public class PoolTest @MethodSource(value = "strategy") public void testRemoveMultiplexed(Factory factory) { - Pool pool = factory.getPool(1); + Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); - pool.reserve().enable("aaa", false); + pool.reserve().enable(new CloseableHolder("aaa"), false); - Pool.Entry e1 = pool.acquire(); + Pool.Entry e1 = pool.acquire(); assertThat(e1, notNullValue()); - Pool.Entry e2 = pool.acquire(); + Pool.Entry e2 = pool.acquire(); assertThat(e2, notNullValue()); assertThat(e2, sameInstance(e1)); assertThat(e2.getUsageCount(), is(2)); @@ -456,12 +463,12 @@ public class PoolTest @MethodSource(value = "strategy") public void testMultiplexRemoveThenAcquireThenReleaseRemove(Factory factory) { - Pool pool = factory.getPool(1); + Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); - pool.reserve().enable("aaa", false); + pool.reserve().enable(new CloseableHolder("aaa"), false); - Pool.Entry e1 = pool.acquire(); - Pool.Entry e2 = pool.acquire(); + Pool.Entry e1 = pool.acquire(); + Pool.Entry e2 = pool.acquire(); assertThat(pool.remove(e1), is(false)); assertThat(e1.isClosed(), is(true)); @@ -474,11 +481,11 @@ public class PoolTest @MethodSource(value = "strategy") public void testNonMultiplexRemoveAfterAcquire(Factory factory) { - Pool pool = factory.getPool(1); + Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); - pool.reserve().enable("aaa", false); + pool.reserve().enable(new CloseableHolder("aaa"), false); - Pool.Entry e1 = pool.acquire(); + Pool.Entry e1 = pool.acquire(); assertThat(pool.remove(e1), is(true)); assertThat(pool.size(), is(0)); } @@ -487,12 +494,12 @@ public class PoolTest @MethodSource(value = "strategy") public void testMultiplexRemoveAfterAcquire(Factory factory) { - Pool pool = factory.getPool(1); + Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); - pool.reserve().enable("aaa", false); + pool.reserve().enable(new CloseableHolder("aaa"), false); - Pool.Entry e1 = pool.acquire(); - Pool.Entry e2 = pool.acquire(); + Pool.Entry e1 = pool.acquire(); + Pool.Entry e2 = pool.acquire(); assertThat(pool.remove(e1), is(false)); assertThat(pool.remove(e2), is(true)); @@ -501,7 +508,7 @@ public class PoolTest assertThat(pool.release(e1), is(false)); assertThat(pool.size(), is(0)); - Pool.Entry e3 = pool.acquire(); + Pool.Entry e3 = pool.acquire(); assertThat(e3, nullValue()); assertThat(pool.release(e2), is(false)); @@ -512,8 +519,8 @@ public class PoolTest @MethodSource(value = "strategy") public void testReleaseThenRemoveNonEnabledEntry(Factory factory) { - Pool pool = factory.getPool(1); - Pool.Entry e = pool.reserve(); + Pool pool = factory.getPool(1); + Pool.Entry e = pool.reserve(); assertThat(pool.size(), is(1)); assertThat(pool.release(e), is(false)); assertThat(pool.size(), is(1)); @@ -525,8 +532,8 @@ public class PoolTest @MethodSource(value = "strategy") public void testRemoveNonEnabledEntry(Factory factory) { - Pool pool = factory.getPool(1); - Pool.Entry e = pool.reserve(); + Pool pool = factory.getPool(1); + Pool.Entry e = pool.reserve(); assertThat(pool.size(), is(1)); assertThat(pool.remove(e), is(true)); assertThat(pool.size(), is(0)); @@ -536,16 +543,16 @@ public class PoolTest @MethodSource(value = "strategy") public void testMultiplexMaxUsageReachedAcquireThenRemove(Factory factory) { - Pool pool = factory.getPool(1); + Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); pool.setMaxUsageCount(3); - pool.reserve().enable("aaa", false); + pool.reserve().enable(new CloseableHolder("aaa"), false); - Pool.Entry e0 = pool.acquire(); + Pool.Entry e0 = pool.acquire(); - Pool.Entry e1 = pool.acquire(); + Pool.Entry e1 = pool.acquire(); assertThat(pool.release(e1), is(true)); - Pool.Entry e2 = pool.acquire(); + Pool.Entry e2 = pool.acquire(); assertThat(pool.release(e2), is(true)); assertThat(pool.acquire(), nullValue()); @@ -557,16 +564,16 @@ public class PoolTest @MethodSource(value = "strategy") public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove(Factory factory) { - Pool pool = factory.getPool(1); + Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); pool.setMaxUsageCount(3); - pool.reserve().enable("aaa", false); + pool.reserve().enable(new CloseableHolder("aaa"), false); - Pool.Entry e0 = pool.acquire(); + Pool.Entry e0 = pool.acquire(); - Pool.Entry e1 = pool.acquire(); + Pool.Entry e1 = pool.acquire(); assertThat(pool.release(e1), is(true)); - Pool.Entry e2 = pool.acquire(); + Pool.Entry e2 = pool.acquire(); assertThat(pool.release(e2), is(true)); assertThat(pool.acquire(), nullValue()); @@ -582,14 +589,14 @@ public class PoolTest @MethodSource(value = "strategy") public void testUsageCountAfterReachingMaxMultiplexLimit(Factory factory) { - Pool pool = factory.getPool(1); + Pool pool = factory.getPool(1); pool.setMaxMultiplex(2); pool.setMaxUsageCount(10); - pool.reserve().enable("aaa", false); + pool.reserve().enable(new CloseableHolder("aaa"), false); - Pool.Entry e1 = pool.acquire(); + Pool.Entry e1 = pool.acquire(); assertThat(e1.getUsageCount(), is(1)); - Pool.Entry e2 = pool.acquire(); + Pool.Entry e2 = pool.acquire(); assertThat(e2, sameInstance(e1)); assertThat(e1.getUsageCount(), is(2)); assertThat(pool.acquire(), nullValue()); @@ -600,58 +607,61 @@ public class PoolTest @MethodSource(value = "strategy") public void testDynamicMaxUsageCountChangeOverflowMaxInt(Factory factory) { - Pool pool = factory.getPool(1); - Pool.Entry entry = pool.reserve(); - entry.enable("aaa", false); + Pool pool = factory.getPool(1); + Pool.Entry entry = pool.reserve(); + entry.enable(new CloseableHolder("aaa"), false); entry.setUsageCount(Integer.MAX_VALUE); - Pool.Entry acquired1 = pool.acquire(); + Pool.Entry acquired1 = pool.acquire(); assertThat(acquired1, notNullValue()); assertThat(pool.release(acquired1), is(true)); pool.setMaxUsageCount(1); - Pool.Entry acquired2 = pool.acquire(); + Pool.Entry acquired2 = pool.acquire(); assertThat(acquired2, nullValue()); + assertThat(entry.getPooled().closed, is(true)); } @ParameterizedTest @MethodSource(value = "strategy") public void testDynamicMaxUsageCountChangeSweep(Factory factory) { - Pool pool = factory.getPool(2); - Pool.Entry entry1 = pool.reserve(); - entry1.enable("aaa", false); - Pool.Entry entry2 = pool.reserve(); - entry2.enable("bbb", false); + Pool pool = factory.getPool(2); + Pool.Entry entry1 = pool.reserve(); + entry1.enable(new CloseableHolder("aaa"), false); + Pool.Entry entry2 = pool.reserve(); + entry2.enable(new CloseableHolder("bbb"), false); - Pool.Entry acquired1 = pool.acquire(); + Pool.Entry acquired1 = pool.acquire(); assertThat(acquired1, notNullValue()); assertThat(pool.release(acquired1), is(true)); + assertThat(pool.size(), is(2)); pool.setMaxUsageCount(1); assertThat(pool.size(), is(1)); + assertThat(entry1.getPooled().closed ^ entry2.getPooled().closed, is(true)); } @Test public void testConfigLimits() { - assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxMultiplex(0)); - assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxMultiplex(-1)); - assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxUsageCount(0)); + assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxMultiplex(0)); + assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxMultiplex(-1)); + assertThrows(IllegalArgumentException.class, () -> new Pool(FIRST, 1).setMaxUsageCount(0)); } @ParameterizedTest @MethodSource(value = "strategy") public void testAcquireWithCreator(Factory factory) { - Pool pool = factory.getPool(2); + Pool pool = factory.getPool(2); assertThat(pool.size(), is(0)); assertThat(pool.acquire(e -> null), nullValue()); assertThat(pool.size(), is(0)); - Pool.Entry e1 = pool.acquire(e -> "e1"); - assertThat(e1.getPooled(), is("e1")); + Pool.Entry e1 = pool.acquire(e -> new CloseableHolder("e1")); + assertThat(e1.getPooled().value, is("e1")); assertThat(pool.size(), is(1)); assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getInUseCount(), is(1)); @@ -659,13 +669,13 @@ public class PoolTest assertThat(pool.acquire(e -> null), nullValue()); assertThat(pool.size(), is(1)); - Pool.Entry e2 = pool.acquire(e -> "e2"); - assertThat(e2.getPooled(), is("e2")); + Pool.Entry e2 = pool.acquire(e -> new CloseableHolder("e2")); + assertThat(e2.getPooled().value, is("e2")); assertThat(pool.size(), is(2)); assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getInUseCount(), is(2)); - Pool.Entry e3 = pool.acquire(e -> "e3"); + Pool.Entry e3 = pool.acquire(e -> new CloseableHolder("e3")); assertThat(e3, nullValue()); assertThat(pool.size(), is(2)); assertThat(pool.getReservedCount(), is(0)); @@ -681,8 +691,8 @@ public class PoolTest assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getInUseCount(), is(1)); - Pool.Entry e4 = pool.acquire(e -> "e4"); - assertThat(e4.getPooled(), is("e2")); + Pool.Entry e4 = pool.acquire(e -> new CloseableHolder("e4")); + assertThat(e4.getPooled().value, is("e2")); assertThat(pool.size(), is(2)); assertThat(pool.getReservedCount(), is(0)); assertThat(pool.getInUseCount(), is(2));