Merge remote-tracking branch origin/jetty-9.4.x into jetty-10.0.x
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
commit
058a488c30
|
@ -18,14 +18,17 @@ import java.util.ArrayDeque;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collection;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.util.Attachable;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.IO;
|
||||
import org.eclipse.jetty.util.Pool;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
|
@ -48,6 +51,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
|||
private final Callback requester;
|
||||
private final Pool<Connection> pool;
|
||||
private boolean maximizeConnections;
|
||||
private volatile long maxDurationNanos = 0L;
|
||||
|
||||
protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
|
||||
{
|
||||
|
@ -90,6 +94,27 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
|||
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Get the max usage duration in milliseconds of the pool's connections.
|
||||
* Values {@code 0} and negative mean that there is no limit.</p>
|
||||
* <p>This only guarantees that a connection cannot be acquired after the configured
|
||||
* duration elapses, so that is only enforced when {@link #acquire(boolean)} is called.
|
||||
* If a pool stays completely idle for a duration longer than the value
|
||||
* returned by this method, the max duration will not be enforced.
|
||||
* It's up to the idle timeout mechanism (see {@link HttpClient#getIdleTimeout()})
|
||||
* to handle closing idle connections.</p>
|
||||
*/
|
||||
@ManagedAttribute(value = "The maximum duration in milliseconds a connection can be used for before it gets closed")
|
||||
public long getMaxDuration()
|
||||
{
|
||||
return TimeUnit.NANOSECONDS.toMillis(maxDurationNanos);
|
||||
}
|
||||
|
||||
public void setMaxDuration(long maxDurationInMs)
|
||||
{
|
||||
this.maxDurationNanos = TimeUnit.MILLISECONDS.toNanos(maxDurationInMs);
|
||||
}
|
||||
|
||||
protected int getMaxMultiplex()
|
||||
{
|
||||
return pool.getMaxMultiplex();
|
||||
|
@ -257,7 +282,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
|||
{
|
||||
if (!(connection instanceof Attachable))
|
||||
throw new IllegalArgumentException("Invalid connection object: " + connection);
|
||||
Pool<Connection>.Entry entry = pool.reserve(-1);
|
||||
Pool<Connection>.Entry entry = pool.reserve();
|
||||
if (entry == null)
|
||||
return false;
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -276,18 +301,37 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
|||
}
|
||||
|
||||
protected Connection activate()
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
Pool<Connection>.Entry entry = pool.acquire();
|
||||
if (entry != null)
|
||||
{
|
||||
Connection connection = entry.getPooled();
|
||||
|
||||
long maxDurationNanos = this.maxDurationNanos;
|
||||
if (maxDurationNanos > 0L)
|
||||
{
|
||||
EntryHolder holder = (EntryHolder)((Attachable)connection).getAttachment();
|
||||
if (holder.isExpired(maxDurationNanos))
|
||||
{
|
||||
boolean canClose = remove(connection, true);
|
||||
if (canClose)
|
||||
IO.close(connection);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Connection removed{} due to expiration {} {}", (canClose ? " and closed" : ""), entry, pool);
|
||||
continue;
|
||||
}
|
||||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Activated {} {}", entry, pool);
|
||||
Connection connection = entry.getPooled();
|
||||
acquired(connection);
|
||||
return connection;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isActive(Connection connection)
|
||||
|
@ -295,11 +339,10 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
|||
if (!(connection instanceof Attachable))
|
||||
throw new IllegalArgumentException("Invalid connection object: " + connection);
|
||||
Attachable attachable = (Attachable)connection;
|
||||
@SuppressWarnings("unchecked")
|
||||
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
|
||||
if (entry == null)
|
||||
EntryHolder holder = (EntryHolder)attachable.getAttachment();
|
||||
if (holder == null)
|
||||
return false;
|
||||
return !entry.isIdle();
|
||||
return !holder.entry.isIdle();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -316,13 +359,12 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
|||
if (!(connection instanceof Attachable))
|
||||
throw new IllegalArgumentException("Invalid connection object: " + connection);
|
||||
Attachable attachable = (Attachable)connection;
|
||||
@SuppressWarnings("unchecked")
|
||||
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
|
||||
if (entry == null)
|
||||
EntryHolder holder = (EntryHolder)attachable.getAttachment();
|
||||
if (holder == null)
|
||||
return true;
|
||||
boolean reusable = pool.release(entry);
|
||||
boolean reusable = pool.release(holder.entry);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Released ({}) {} {}", reusable, entry, pool);
|
||||
LOG.debug("Released ({}) {} {}", reusable, holder.entry, pool);
|
||||
if (reusable)
|
||||
return true;
|
||||
remove(connection);
|
||||
|
@ -340,14 +382,14 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
|||
if (!(connection instanceof Attachable))
|
||||
throw new IllegalArgumentException("Invalid connection object: " + connection);
|
||||
Attachable attachable = (Attachable)connection;
|
||||
@SuppressWarnings("unchecked")
|
||||
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
|
||||
if (entry == null)
|
||||
EntryHolder holder = (EntryHolder)attachable.getAttachment();
|
||||
if (holder == null)
|
||||
return false;
|
||||
boolean removed = pool.remove(holder.entry);
|
||||
if (removed)
|
||||
attachable.setAttachment(null);
|
||||
boolean removed = pool.remove(entry);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Removed ({}) {} {}", removed, entry, pool);
|
||||
LOG.debug("Removed ({}) {} {}", removed, holder.entry, pool);
|
||||
if (removed || force)
|
||||
{
|
||||
released(connection);
|
||||
|
@ -410,9 +452,11 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
|||
@Override
|
||||
public boolean sweep()
|
||||
{
|
||||
pool.values().stream().filter(entry -> entry.getPooled() instanceof Sweeper.Sweepable).forEach(entry ->
|
||||
pool.values().stream()
|
||||
.map(Pool.Entry::getPooled)
|
||||
.filter(connection -> connection instanceof Sweeper.Sweepable)
|
||||
.forEach(connection ->
|
||||
{
|
||||
Connection connection = entry.getPooled();
|
||||
if (((Sweeper.Sweepable)connection).sweep())
|
||||
{
|
||||
boolean removed = remove(connection);
|
||||
|
@ -457,7 +501,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
|||
LOG.debug("Connection creation succeeded {}: {}", reserved, connection);
|
||||
if (connection instanceof Attachable)
|
||||
{
|
||||
((Attachable)connection).setAttachment(reserved);
|
||||
((Attachable)connection).setAttachment(new EntryHolder(reserved));
|
||||
onCreated(connection);
|
||||
pending.decrementAndGet();
|
||||
reserved.enable(connection, false);
|
||||
|
@ -484,4 +528,20 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
|||
requester.failed(x);
|
||||
}
|
||||
}
|
||||
|
||||
private static class EntryHolder
|
||||
{
|
||||
private final Pool<Connection>.Entry entry;
|
||||
private final long creationTimestamp = System.nanoTime();
|
||||
|
||||
private EntryHolder(Pool<Connection>.Entry entry)
|
||||
{
|
||||
this.entry = Objects.requireNonNull(entry);
|
||||
}
|
||||
|
||||
private boolean isExpired(long timeoutNanos)
|
||||
{
|
||||
return System.nanoTime() - creationTimestamp >= timeoutNanos;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -21,12 +21,14 @@ import java.util.concurrent.CountDownLatch;
|
|||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.IntStream;
|
||||
import java.util.stream.Stream;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
import org.eclipse.jetty.client.api.Destination;
|
||||
import org.eclipse.jetty.client.api.Request;
|
||||
|
@ -47,6 +49,7 @@ import org.eclipse.jetty.util.SocketAddressResolver;
|
|||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
|
@ -70,6 +73,12 @@ public class ConnectionPoolTest
|
|||
{
|
||||
return Stream.of(
|
||||
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
|
||||
new ConnectionPoolFactory("duplex-maxDuration", destination ->
|
||||
{
|
||||
DuplexConnectionPool pool = new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination);
|
||||
pool.setMaxDuration(10);
|
||||
return pool;
|
||||
}),
|
||||
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)),
|
||||
new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
|
||||
);
|
||||
|
@ -438,6 +447,116 @@ public class ConnectionPoolTest
|
|||
assertThat(connectionPool.getConnectionCount(), Matchers.greaterThanOrEqualTo(count));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxDurationConnectionsWithConstrainedPool() throws Exception
|
||||
{
|
||||
// ConnectionPool may NOT open more connections than expected because
|
||||
// it is constrained to a single connection in this test.
|
||||
|
||||
final int maxConnections = 1;
|
||||
final int maxDuration = 30;
|
||||
AtomicInteger poolCreateCounter = new AtomicInteger();
|
||||
AtomicInteger poolRemoveCounter = new AtomicInteger();
|
||||
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
|
||||
{
|
||||
// Constrain the max pool size to 1.
|
||||
DuplexConnectionPool pool = new DuplexConnectionPool(destination, maxConnections, destination)
|
||||
{
|
||||
@Override
|
||||
protected void onCreated(Connection connection)
|
||||
{
|
||||
poolCreateCounter.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void removed(Connection connection)
|
||||
{
|
||||
poolRemoveCounter.incrementAndGet();
|
||||
}
|
||||
};
|
||||
pool.setMaxDuration(maxDuration);
|
||||
return pool;
|
||||
});
|
||||
|
||||
startServer(new EmptyServerHandler());
|
||||
|
||||
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
|
||||
transport.setConnectionPoolFactory(factory.factory);
|
||||
client = new HttpClient(transport);
|
||||
client.start();
|
||||
|
||||
// Use the connection pool 5 times with a delay that is longer than the max duration in between each time.
|
||||
for (int i = 0; i < 5; i++)
|
||||
{
|
||||
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
assertThat(response.getStatus(), Matchers.is(200));
|
||||
|
||||
Thread.sleep(maxDuration * 2);
|
||||
}
|
||||
|
||||
// Check that the pool created 5 and removed 4 connections;
|
||||
// it must be exactly 4 removed b/c each cycle of the loop
|
||||
// can only open 1 connection as the pool is constrained to
|
||||
// maximum 1 connection.
|
||||
assertThat(poolCreateCounter.get(), Matchers.is(5));
|
||||
assertThat(poolRemoveCounter.get(), Matchers.is(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxDurationConnectionsWithUnconstrainedPool() throws Exception
|
||||
{
|
||||
// ConnectionPools may open a few more connections than expected.
|
||||
|
||||
final int maxDuration = 30;
|
||||
AtomicInteger poolCreateCounter = new AtomicInteger();
|
||||
AtomicInteger poolRemoveCounter = new AtomicInteger();
|
||||
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
|
||||
{
|
||||
DuplexConnectionPool pool = new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)
|
||||
{
|
||||
@Override
|
||||
protected void onCreated(Connection connection)
|
||||
{
|
||||
poolCreateCounter.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void removed(Connection connection)
|
||||
{
|
||||
poolRemoveCounter.incrementAndGet();
|
||||
}
|
||||
};
|
||||
pool.setMaxDuration(maxDuration);
|
||||
return pool;
|
||||
});
|
||||
|
||||
startServer(new EmptyServerHandler());
|
||||
|
||||
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
|
||||
transport.setConnectionPoolFactory(factory.factory);
|
||||
client = new HttpClient(transport);
|
||||
client.start();
|
||||
|
||||
// Use the connection pool 5 times with a delay that is longer than the max duration in between each time.
|
||||
for (int i = 0; i < 5; i++)
|
||||
{
|
||||
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
assertThat(response.getStatus(), Matchers.is(200));
|
||||
|
||||
Thread.sleep(maxDuration * 2);
|
||||
}
|
||||
|
||||
// Check that the pool created 5 and removed at least 4 connections;
|
||||
// it can be more than 4 removed b/c each cycle of the loop may
|
||||
// open more than 1 connection as the pool is not constrained.
|
||||
assertThat(poolCreateCounter.get(), Matchers.is(5));
|
||||
assertThat(poolRemoveCounter.get(), Matchers.greaterThanOrEqualTo(4));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource("pools")
|
||||
public void testConnectionMaxUsage(ConnectionPoolFactory factory) throws Exception
|
||||
|
|
|
@ -0,0 +1,309 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.http2.client.http;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.Semaphore;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import javax.servlet.ServletException;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
||||
import org.eclipse.jetty.client.ConnectionPool;
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.client.HttpClientTransport;
|
||||
import org.eclipse.jetty.client.MultiplexConnectionPool;
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
import org.eclipse.jetty.http2.client.HTTP2Client;
|
||||
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.server.Server;
|
||||
import org.eclipse.jetty.server.ServerConnector;
|
||||
import org.eclipse.jetty.util.Pool;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.jupiter.api.AfterEach;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
// Sibling of ConnectionPoolTest, but using H2 to multiplex connections.
|
||||
public class MultiplexedConnectionPoolTest
|
||||
{
|
||||
private static final int MAX_MULTIPLEX = 2;
|
||||
|
||||
private Server server;
|
||||
private ServerConnector connector;
|
||||
private HttpClient client;
|
||||
|
||||
private void startServer(Handler handler) throws Exception
|
||||
{
|
||||
server = new Server();
|
||||
HTTP2ServerConnectionFactory http2ServerConnectionFactory = new HTTP2ServerConnectionFactory(new HttpConfiguration());
|
||||
http2ServerConnectionFactory.setMaxConcurrentStreams(MAX_MULTIPLEX);
|
||||
connector = new ServerConnector(server, 1, 1, http2ServerConnectionFactory);
|
||||
server.addConnector(connector);
|
||||
server.setHandler(handler);
|
||||
server.start();
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void disposeServer() throws Exception
|
||||
{
|
||||
connector = null;
|
||||
if (server != null)
|
||||
{
|
||||
server.stop();
|
||||
server = null;
|
||||
}
|
||||
}
|
||||
|
||||
@AfterEach
|
||||
public void disposeClient() throws Exception
|
||||
{
|
||||
if (client != null)
|
||||
{
|
||||
client.stop();
|
||||
client = null;
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception
|
||||
{
|
||||
final int maxDuration = 30;
|
||||
AtomicInteger poolCreateCounter = new AtomicInteger();
|
||||
AtomicInteger poolRemoveCounter = new AtomicInteger();
|
||||
AtomicReference<Pool<Connection>> poolRef = new AtomicReference<>();
|
||||
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
|
||||
{
|
||||
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
|
||||
Pool<Connection> pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false);
|
||||
poolRef.set(pool);
|
||||
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX)
|
||||
{
|
||||
@Override
|
||||
protected void onCreated(Connection connection)
|
||||
{
|
||||
poolCreateCounter.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void removed(Connection connection)
|
||||
{
|
||||
poolRemoveCounter.incrementAndGet();
|
||||
}
|
||||
};
|
||||
connectionPool.setMaxDuration(maxDuration);
|
||||
return connectionPool;
|
||||
});
|
||||
|
||||
startServer(new EmptyServerHandler());
|
||||
|
||||
HttpClientTransport transport = new HttpClientTransportOverHTTP2(new HTTP2Client());
|
||||
transport.setConnectionPoolFactory(factory.factory);
|
||||
client = new HttpClient(transport);
|
||||
client.start();
|
||||
|
||||
// Use the connection pool 5 times with a delay that is longer than the max duration in between each time.
|
||||
for (int i = 0; i < 5; i++)
|
||||
{
|
||||
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
assertThat(response.getStatus(), Matchers.is(200));
|
||||
|
||||
// Check that the pool never grows above 1.
|
||||
assertThat(poolRef.get().size(), is(1));
|
||||
|
||||
Thread.sleep(maxDuration * 2);
|
||||
}
|
||||
|
||||
// Check that the pool created 5 and removed 4 connections;
|
||||
// it must be exactly 4 removed b/c while the pool is not
|
||||
// constrained, it can multiplex requests on a single connection
|
||||
// so that should prevent opening more connections than needed.
|
||||
assertThat(poolCreateCounter.get(), is(5));
|
||||
assertThat(poolRemoveCounter.get(), is(4));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMaxDurationConnectionsWithMultiplexedPoolClosesExpiredConnectionWhileStillInUse() throws Exception
|
||||
{
|
||||
final int maxDuration = 1000;
|
||||
final int maxIdle = 2000;
|
||||
|
||||
AtomicInteger poolCreateCounter = new AtomicInteger();
|
||||
AtomicInteger poolRemoveCounter = new AtomicInteger();
|
||||
AtomicReference<Pool<Connection>> poolRef = new AtomicReference<>();
|
||||
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
|
||||
{
|
||||
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
|
||||
Pool<Connection> pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false);
|
||||
poolRef.set(pool);
|
||||
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX)
|
||||
{
|
||||
@Override
|
||||
protected void onCreated(Connection connection)
|
||||
{
|
||||
poolCreateCounter.incrementAndGet();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void removed(Connection connection)
|
||||
{
|
||||
poolRemoveCounter.incrementAndGet();
|
||||
}
|
||||
};
|
||||
connectionPool.setMaxDuration(maxDuration);
|
||||
return connectionPool;
|
||||
});
|
||||
|
||||
Semaphore handlerSignalingSemaphore = new Semaphore(0);
|
||||
Semaphore handlerWaitingSemaphore = new Semaphore(0);
|
||||
startServer(new EmptyServerHandler()
|
||||
{
|
||||
@Override
|
||||
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
|
||||
{
|
||||
if (!target.equals("/block"))
|
||||
return;
|
||||
|
||||
handlerSignalingSemaphore.release();
|
||||
|
||||
try
|
||||
{
|
||||
handlerWaitingSemaphore.acquire();
|
||||
}
|
||||
catch (Exception e)
|
||||
{
|
||||
throw new ServletException(e);
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
HttpClientTransport transport = new HttpClientTransportOverHTTP2(new HTTP2Client());
|
||||
transport.setConnectionPoolFactory(factory.factory);
|
||||
client = new HttpClient(transport);
|
||||
client.setIdleTimeout(maxIdle);
|
||||
client.start();
|
||||
|
||||
CountDownLatch latch1 = new CountDownLatch(1);
|
||||
CountDownLatch latch2 = new CountDownLatch(2);
|
||||
// create 2 requests that are going to consume all the multiplexing slots
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.path("/block")
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send(result ->
|
||||
{
|
||||
if (result.isSucceeded())
|
||||
{
|
||||
latch1.countDown();
|
||||
latch2.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
// wait for the 1st request to be serviced to make sure only 1 connection gets created
|
||||
handlerSignalingSemaphore.acquire();
|
||||
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.path("/block")
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send(result ->
|
||||
{
|
||||
if (result.isSucceeded())
|
||||
{
|
||||
latch1.countDown();
|
||||
latch2.countDown();
|
||||
}
|
||||
});
|
||||
|
||||
// wait for both requests to start being serviced
|
||||
handlerSignalingSemaphore.acquire();
|
||||
|
||||
assertThat(poolCreateCounter.get(), is(1));
|
||||
|
||||
// finalize 1 request, freeing up 1 multiplexing slot
|
||||
handlerWaitingSemaphore.release();
|
||||
// wait until 1st request finished
|
||||
assertTrue(latch1.await(5, TimeUnit.SECONDS));
|
||||
|
||||
assertThat(poolRef.get().getInUseCount(), is(1));
|
||||
assertThat(poolRef.get().getIdleCount(), is(0));
|
||||
assertThat(poolRef.get().getClosedCount(), is(0));
|
||||
assertThat(poolRef.get().size(), is(1));
|
||||
|
||||
// wait for the connection to expire
|
||||
Thread.sleep(maxDuration + 500);
|
||||
|
||||
// send a 3rd request that will close the expired multiplexed connection
|
||||
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
|
||||
.path("/do-not-block")
|
||||
.timeout(5, TimeUnit.SECONDS)
|
||||
.send();
|
||||
assertThat(response.getStatus(), is(200));
|
||||
|
||||
assertThat(poolRef.get().getInUseCount(), is(0));
|
||||
assertThat(poolRef.get().getIdleCount(), is(1));
|
||||
assertThat(poolRef.get().getClosedCount(), is(1));
|
||||
assertThat(poolRef.get().size(), is(2));
|
||||
|
||||
// unblock 2nd request
|
||||
handlerWaitingSemaphore.release();
|
||||
//wait until 2nd request finished
|
||||
assertTrue(latch2.await(5, TimeUnit.SECONDS));
|
||||
|
||||
assertThat(poolRef.get().getInUseCount(), is(0));
|
||||
assertThat(poolRef.get().getIdleCount(), is(1));
|
||||
assertThat(poolRef.get().getClosedCount(), is(0));
|
||||
assertThat(poolRef.get().size(), is(1));
|
||||
assertThat(poolCreateCounter.get(), is(2));
|
||||
|
||||
// wait for idle connections to be closed
|
||||
Thread.sleep(maxIdle + 500);
|
||||
|
||||
assertThat(poolRef.get().getIdleCount(), is(0));
|
||||
assertThat(poolRef.get().size(), is(0));
|
||||
assertThat(poolRemoveCounter.get(), is(3));
|
||||
}
|
||||
|
||||
private static class ConnectionPoolFactory
|
||||
{
|
||||
private final String name;
|
||||
private final ConnectionPool.Factory factory;
|
||||
|
||||
private ConnectionPoolFactory(String name, ConnectionPool.Factory factory)
|
||||
{
|
||||
this.name = name;
|
||||
this.factory = factory;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return name;
|
||||
}
|
||||
}
|
||||
}
|
|
@ -144,6 +144,11 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
return (int)entries.stream().filter(Entry::isInUse).count();
|
||||
}
|
||||
|
||||
public int getClosedCount()
|
||||
{
|
||||
return (int)entries.stream().filter(Entry::isClosed).count();
|
||||
}
|
||||
|
||||
public int getMaxEntries()
|
||||
{
|
||||
return maxEntries;
|
||||
|
@ -597,8 +602,9 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
}
|
||||
|
||||
/**
|
||||
* Try to mark the entry as removed.
|
||||
* @return true if the entry has to be removed from the containing pool, false otherwise.
|
||||
* Try to remove the entry by marking it as closed and decrementing the multiplexing counter.
|
||||
* The multiplexing counter will never go below zero and if it reaches zero, the entry is considered removed.
|
||||
* @return true if the entry can be removed from the containing pool, false otherwise.
|
||||
*/
|
||||
boolean tryRemove()
|
||||
{
|
||||
|
|
|
@ -19,7 +19,6 @@ import java.util.Arrays;
|
|||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
|
@ -47,7 +46,24 @@ public class PoolTest
|
|||
{
|
||||
interface Factory
|
||||
{
|
||||
Pool<String> getPool(int maxSize);
|
||||
Pool<CloseableHolder> getPool(int maxSize);
|
||||
}
|
||||
|
||||
private static class CloseableHolder implements Closeable
|
||||
{
|
||||
private boolean closed;
|
||||
private final String value;
|
||||
|
||||
public CloseableHolder(String value)
|
||||
{
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
closed = true;
|
||||
}
|
||||
}
|
||||
|
||||
public static Stream<Object[]> strategy()
|
||||
|
@ -64,15 +80,15 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testAcquireRelease(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
pool.reserve().enable("aaa", false);
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.getReservedCount(), is(0));
|
||||
assertThat(pool.getIdleCount(), is(1));
|
||||
assertThat(pool.getInUseCount(), is(0));
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
assertThat(e1.getPooled(), equalTo("aaa"));
|
||||
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||
assertThat(e1.getPooled().value, equalTo("aaa"));
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.getReservedCount(), is(0));
|
||||
assertThat(pool.getIdleCount(), is(0));
|
||||
|
@ -88,8 +104,8 @@ public class PoolTest
|
|||
|
||||
assertThrows(IllegalStateException.class, e1::release);
|
||||
|
||||
Pool<String>.Entry e2 = pool.acquire();
|
||||
assertThat(e2.getPooled(), equalTo("aaa"));
|
||||
Pool<CloseableHolder>.Entry e2 = pool.acquire();
|
||||
assertThat(e2.getPooled().value, equalTo("aaa"));
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.getReservedCount(), is(0));
|
||||
assertThat(pool.getIdleCount(), is(0));
|
||||
|
@ -108,10 +124,10 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testRemoveBeforeRelease(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
pool.reserve().enable("aaa", false);
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||
assertThat(pool.remove(e1), is(true));
|
||||
assertThat(pool.remove(e1), is(false));
|
||||
assertThat(pool.release(e1), is(false));
|
||||
|
@ -121,21 +137,22 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testCloseBeforeRelease(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
pool.reserve().enable("aaa", false);
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||
assertThat(pool.size(), is(1));
|
||||
pool.close();
|
||||
assertThat(pool.size(), is(0));
|
||||
assertThat(pool.release(e1), is(false));
|
||||
assertThat(e1.getPooled().closed, is(true));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "strategy")
|
||||
public void testMaxPoolSize(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
assertThat(pool.size(), is(0));
|
||||
assertThat(pool.reserve(), notNullValue());
|
||||
assertThat(pool.size(), is(1));
|
||||
|
@ -147,25 +164,25 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testReserve(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(2);
|
||||
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||
pool.setMaxMultiplex(2);
|
||||
|
||||
// Reserve an entry
|
||||
Pool<String>.Entry e1 = pool.reserve();
|
||||
Pool<CloseableHolder>.Entry e1 = pool.reserve();
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.getReservedCount(), is(1));
|
||||
assertThat(pool.getIdleCount(), is(0));
|
||||
assertThat(pool.getInUseCount(), is(0));
|
||||
|
||||
// enable the entry
|
||||
e1.enable("aaa", false);
|
||||
e1.enable(new CloseableHolder("aaa"), false);
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.getReservedCount(), is(0));
|
||||
assertThat(pool.getIdleCount(), is(1));
|
||||
assertThat(pool.getInUseCount(), is(0));
|
||||
|
||||
// Reserve another entry
|
||||
Pool<String>.Entry e2 = pool.reserve();
|
||||
Pool<CloseableHolder>.Entry e2 = pool.reserve();
|
||||
assertThat(pool.size(), is(2));
|
||||
assertThat(pool.getReservedCount(), is(1));
|
||||
assertThat(pool.getIdleCount(), is(1));
|
||||
|
@ -179,35 +196,35 @@ public class PoolTest
|
|||
assertThat(pool.getInUseCount(), is(0));
|
||||
|
||||
// Reserve another entry
|
||||
Pool<String>.Entry e3 = pool.reserve();
|
||||
Pool<CloseableHolder>.Entry e3 = pool.reserve();
|
||||
assertThat(pool.size(), is(2));
|
||||
assertThat(pool.getReservedCount(), is(1));
|
||||
assertThat(pool.getIdleCount(), is(1));
|
||||
assertThat(pool.getInUseCount(), is(0));
|
||||
|
||||
// enable and acquire the entry
|
||||
e3.enable("bbb", true);
|
||||
e3.enable(new CloseableHolder("bbb"), true);
|
||||
assertThat(pool.size(), is(2));
|
||||
assertThat(pool.getReservedCount(), is(0));
|
||||
assertThat(pool.getIdleCount(), is(1));
|
||||
assertThat(pool.getInUseCount(), is(1));
|
||||
|
||||
// can't reenable
|
||||
assertThrows(IllegalStateException.class, () -> e3.enable("xxx", false));
|
||||
assertThrows(IllegalStateException.class, () -> e3.enable(new CloseableHolder("xxx"), false));
|
||||
|
||||
// Can't enable acquired entry
|
||||
Pool<String>.Entry e = pool.acquire();
|
||||
assertThrows(IllegalStateException.class, () -> e.enable("xxx", false));
|
||||
Pool<CloseableHolder>.Entry e = pool.acquire();
|
||||
assertThrows(IllegalStateException.class, () -> e.enable(new CloseableHolder("xxx"), false));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "strategy")
|
||||
public void testDeprecatedReserve(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(2);
|
||||
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||
|
||||
// Reserve an entry
|
||||
Pool<String>.Entry e1 = pool.reserve(-1);
|
||||
Pool<CloseableHolder>.Entry e1 = pool.reserve();
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.getReservedCount(), is(1));
|
||||
assertThat(pool.getIdleCount(), is(0));
|
||||
|
@ -221,14 +238,14 @@ public class PoolTest
|
|||
assertThat(pool.getInUseCount(), is(0));
|
||||
|
||||
// enable the entry
|
||||
e1.enable("aaa", false);
|
||||
e1.enable(new CloseableHolder("aaa"), false);
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.getReservedCount(), is(0));
|
||||
assertThat(pool.getIdleCount(), is(1));
|
||||
assertThat(pool.getInUseCount(), is(0));
|
||||
|
||||
// Reserve another entry
|
||||
Pool<String>.Entry e2 = pool.reserve(-1);
|
||||
Pool<CloseableHolder>.Entry e2 = pool.reserve();
|
||||
assertThat(pool.size(), is(2));
|
||||
assertThat(pool.getReservedCount(), is(1));
|
||||
assertThat(pool.getIdleCount(), is(1));
|
||||
|
@ -242,32 +259,32 @@ public class PoolTest
|
|||
assertThat(pool.getInUseCount(), is(0));
|
||||
|
||||
// Reserve another entry
|
||||
Pool<String>.Entry e3 = pool.reserve(-1);
|
||||
Pool<CloseableHolder>.Entry e3 = pool.reserve();
|
||||
assertThat(pool.size(), is(2));
|
||||
assertThat(pool.getReservedCount(), is(1));
|
||||
assertThat(pool.getIdleCount(), is(1));
|
||||
assertThat(pool.getInUseCount(), is(0));
|
||||
|
||||
// enable and acquire the entry
|
||||
e3.enable("bbb", true);
|
||||
e3.enable(new CloseableHolder("bbb"), true);
|
||||
assertThat(pool.size(), is(2));
|
||||
assertThat(pool.getReservedCount(), is(0));
|
||||
assertThat(pool.getIdleCount(), is(1));
|
||||
assertThat(pool.getInUseCount(), is(1));
|
||||
|
||||
// can't reenable
|
||||
assertThrows(IllegalStateException.class, () -> e3.enable("xxx", false));
|
||||
assertThrows(IllegalStateException.class, () -> e3.enable(new CloseableHolder("xxx"), false));
|
||||
|
||||
// Can't enable acquired entry
|
||||
Pool<String>.Entry e = pool.acquire();
|
||||
assertThrows(IllegalStateException.class, () -> e.enable("xxx", false));
|
||||
Pool<CloseableHolder>.Entry e = pool.acquire();
|
||||
assertThrows(IllegalStateException.class, () -> e.enable(new CloseableHolder("xxx"), false));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "strategy")
|
||||
public void testReserveNegativeMaxPending(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(2);
|
||||
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||
assertThat(pool.reserve(), notNullValue());
|
||||
assertThat(pool.reserve(), notNullValue());
|
||||
assertThat(pool.reserve(), nullValue());
|
||||
|
@ -277,8 +294,9 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testClose(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
pool.reserve().enable("aaa", false);
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
CloseableHolder holder = new CloseableHolder("aaa");
|
||||
pool.reserve().enable(holder, false);
|
||||
assertThat(pool.isClosed(), is(false));
|
||||
pool.close();
|
||||
pool.close();
|
||||
|
@ -287,28 +305,17 @@ public class PoolTest
|
|||
assertThat(pool.size(), is(0));
|
||||
assertThat(pool.acquire(), nullValue());
|
||||
assertThat(pool.reserve(), nullValue());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testClosingCloseable()
|
||||
{
|
||||
AtomicBoolean closed = new AtomicBoolean();
|
||||
Pool<Closeable> pool = new Pool<>(FIRST, 1);
|
||||
Closeable pooled = () -> closed.set(true);
|
||||
pool.reserve().enable(pooled, false);
|
||||
assertThat(closed.get(), is(false));
|
||||
pool.close();
|
||||
assertThat(closed.get(), is(true));
|
||||
assertThat(holder.closed, is(true));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "strategy")
|
||||
public void testRemove(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
pool.reserve().enable("aaa", false);
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||
assertThat(pool.remove(e1), is(true));
|
||||
assertThat(pool.remove(e1), is(false));
|
||||
assertThat(pool.release(e1), is(false));
|
||||
|
@ -320,13 +327,13 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testValuesSize(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(2);
|
||||
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||
|
||||
assertThat(pool.size(), is(0));
|
||||
assertThat(pool.values().isEmpty(), is(true));
|
||||
pool.reserve().enable("aaa", false);
|
||||
pool.reserve().enable("bbb", false);
|
||||
assertThat(pool.values().stream().map(Pool.Entry::getPooled).collect(toList()), equalTo(Arrays.asList("aaa", "bbb")));
|
||||
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||
pool.reserve().enable(new CloseableHolder("bbb"), false);
|
||||
assertThat(pool.values().stream().map(Pool.Entry::getPooled).map(closeableHolder -> closeableHolder.value).collect(toList()), equalTo(Arrays.asList("aaa", "bbb")));
|
||||
assertThat(pool.size(), is(2));
|
||||
}
|
||||
|
||||
|
@ -334,10 +341,10 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testValuesContainsAcquiredEntries(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(2);
|
||||
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||
|
||||
pool.reserve().enable("aaa", false);
|
||||
pool.reserve().enable("bbb", false);
|
||||
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||
pool.reserve().enable(new CloseableHolder("bbb"), false);
|
||||
assertThat(pool.acquire(), notNullValue());
|
||||
assertThat(pool.acquire(), notNullValue());
|
||||
assertThat(pool.acquire(), nullValue());
|
||||
|
@ -348,11 +355,11 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testMaxUsageCount(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
pool.setMaxUsageCount(3);
|
||||
pool.reserve().enable("aaa", false);
|
||||
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||
assertThat(pool.release(e1), is(true));
|
||||
e1 = pool.acquire();
|
||||
assertThat(pool.release(e1), is(true));
|
||||
|
@ -363,7 +370,7 @@ public class PoolTest
|
|||
assertThat(pool.remove(e1), is(true));
|
||||
assertThat(pool.remove(e1), is(false));
|
||||
assertThat(pool.size(), is(0));
|
||||
Pool<String>.Entry e1Copy = e1;
|
||||
Pool<CloseableHolder>.Entry e1Copy = e1;
|
||||
assertThat(pool.release(e1Copy), is(false));
|
||||
}
|
||||
|
||||
|
@ -371,7 +378,7 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testMaxMultiplex(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(2);
|
||||
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||
pool.setMaxMultiplex(3);
|
||||
|
||||
Map<String, AtomicInteger> counts = new HashMap<>();
|
||||
|
@ -379,21 +386,21 @@ public class PoolTest
|
|||
AtomicInteger b = new AtomicInteger();
|
||||
counts.put("a", a);
|
||||
counts.put("b", b);
|
||||
pool.reserve().enable("a", false);
|
||||
pool.reserve().enable("b", false);
|
||||
pool.reserve().enable(new CloseableHolder("a"), false);
|
||||
pool.reserve().enable(new CloseableHolder("b"), false);
|
||||
|
||||
counts.get(pool.acquire().getPooled()).incrementAndGet();
|
||||
counts.get(pool.acquire().getPooled()).incrementAndGet();
|
||||
counts.get(pool.acquire().getPooled()).incrementAndGet();
|
||||
counts.get(pool.acquire().getPooled()).incrementAndGet();
|
||||
counts.get(pool.acquire().getPooled().value).incrementAndGet();
|
||||
counts.get(pool.acquire().getPooled().value).incrementAndGet();
|
||||
counts.get(pool.acquire().getPooled().value).incrementAndGet();
|
||||
counts.get(pool.acquire().getPooled().value).incrementAndGet();
|
||||
|
||||
assertThat(a.get(), greaterThan(0));
|
||||
assertThat(a.get(), lessThanOrEqualTo(3));
|
||||
assertThat(b.get(), greaterThan(0));
|
||||
assertThat(b.get(), lessThanOrEqualTo(3));
|
||||
|
||||
counts.get(pool.acquire().getPooled()).incrementAndGet();
|
||||
counts.get(pool.acquire().getPooled()).incrementAndGet();
|
||||
counts.get(pool.acquire().getPooled().value).incrementAndGet();
|
||||
counts.get(pool.acquire().getPooled().value).incrementAndGet();
|
||||
|
||||
assertThat(a.get(), is(3));
|
||||
assertThat(b.get(), is(3));
|
||||
|
@ -405,13 +412,13 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testRemoveMultiplexed(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.reserve().enable("aaa", false);
|
||||
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||
assertThat(e1, notNullValue());
|
||||
Pool<String>.Entry e2 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e2 = pool.acquire();
|
||||
assertThat(e2, notNullValue());
|
||||
assertThat(e2, sameInstance(e1));
|
||||
assertThat(e2.getUsageCount(), is(2));
|
||||
|
@ -435,12 +442,12 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testMultiplexRemoveThenAcquireThenReleaseRemove(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.reserve().enable("aaa", false);
|
||||
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
Pool<String>.Entry e2 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e2 = pool.acquire();
|
||||
|
||||
assertThat(pool.remove(e1), is(false));
|
||||
assertThat(e1.isClosed(), is(true));
|
||||
|
@ -453,11 +460,11 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testNonMultiplexRemoveAfterAcquire(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.reserve().enable("aaa", false);
|
||||
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||
assertThat(pool.remove(e1), is(true));
|
||||
assertThat(pool.size(), is(0));
|
||||
}
|
||||
|
@ -466,12 +473,12 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testMultiplexRemoveAfterAcquire(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.reserve().enable("aaa", false);
|
||||
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
Pool<String>.Entry e2 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e2 = pool.acquire();
|
||||
|
||||
assertThat(pool.remove(e1), is(false));
|
||||
assertThat(pool.remove(e2), is(true));
|
||||
|
@ -480,7 +487,7 @@ public class PoolTest
|
|||
assertThat(pool.release(e1), is(false));
|
||||
assertThat(pool.size(), is(0));
|
||||
|
||||
Pool<String>.Entry e3 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e3 = pool.acquire();
|
||||
assertThat(e3, nullValue());
|
||||
|
||||
assertThat(pool.release(e2), is(false));
|
||||
|
@ -491,8 +498,8 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testReleaseThenRemoveNonEnabledEntry(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
Pool<String>.Entry e = pool.reserve();
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
Pool<CloseableHolder>.Entry e = pool.reserve();
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.release(e), is(false));
|
||||
assertThat(pool.size(), is(1));
|
||||
|
@ -504,8 +511,8 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testRemoveNonEnabledEntry(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
Pool<String>.Entry e = pool.reserve();
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
Pool<CloseableHolder>.Entry e = pool.reserve();
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.remove(e), is(true));
|
||||
assertThat(pool.size(), is(0));
|
||||
|
@ -515,16 +522,16 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testMultiplexMaxUsageReachedAcquireThenRemove(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.setMaxUsageCount(3);
|
||||
pool.reserve().enable("aaa", false);
|
||||
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||
|
||||
Pool<String>.Entry e0 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e0 = pool.acquire();
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||
assertThat(pool.release(e1), is(true));
|
||||
Pool<String>.Entry e2 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e2 = pool.acquire();
|
||||
assertThat(pool.release(e2), is(true));
|
||||
assertThat(pool.acquire(), nullValue());
|
||||
|
||||
|
@ -536,16 +543,16 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.setMaxUsageCount(3);
|
||||
pool.reserve().enable("aaa", false);
|
||||
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||
|
||||
Pool<String>.Entry e0 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e0 = pool.acquire();
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||
assertThat(pool.release(e1), is(true));
|
||||
Pool<String>.Entry e2 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e2 = pool.acquire();
|
||||
assertThat(pool.release(e2), is(true));
|
||||
assertThat(pool.acquire(), nullValue());
|
||||
|
||||
|
@ -561,14 +568,14 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testUsageCountAfterReachingMaxMultiplexLimit(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.setMaxUsageCount(10);
|
||||
pool.reserve().enable("aaa", false);
|
||||
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||
assertThat(e1.getUsageCount(), is(1));
|
||||
Pool<String>.Entry e2 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry e2 = pool.acquire();
|
||||
assertThat(e2, sameInstance(e1));
|
||||
assertThat(e1.getUsageCount(), is(2));
|
||||
assertThat(pool.acquire(), nullValue());
|
||||
|
@ -579,58 +586,61 @@ public class PoolTest
|
|||
@MethodSource(value = "strategy")
|
||||
public void testDynamicMaxUsageCountChangeOverflowMaxInt(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
Pool<String>.Entry entry = pool.reserve();
|
||||
entry.enable("aaa", false);
|
||||
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||
Pool<CloseableHolder>.Entry entry = pool.reserve();
|
||||
entry.enable(new CloseableHolder("aaa"), false);
|
||||
entry.setUsageCount(Integer.MAX_VALUE);
|
||||
|
||||
Pool<String>.Entry acquired1 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry acquired1 = pool.acquire();
|
||||
assertThat(acquired1, notNullValue());
|
||||
assertThat(pool.release(acquired1), is(true));
|
||||
|
||||
pool.setMaxUsageCount(1);
|
||||
Pool<String>.Entry acquired2 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry acquired2 = pool.acquire();
|
||||
assertThat(acquired2, nullValue());
|
||||
assertThat(entry.getPooled().closed, is(true));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "strategy")
|
||||
public void testDynamicMaxUsageCountChangeSweep(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(2);
|
||||
Pool<String>.Entry entry1 = pool.reserve();
|
||||
entry1.enable("aaa", false);
|
||||
Pool<String>.Entry entry2 = pool.reserve();
|
||||
entry2.enable("bbb", false);
|
||||
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||
Pool<CloseableHolder>.Entry entry1 = pool.reserve();
|
||||
entry1.enable(new CloseableHolder("aaa"), false);
|
||||
Pool<CloseableHolder>.Entry entry2 = pool.reserve();
|
||||
entry2.enable(new CloseableHolder("bbb"), false);
|
||||
|
||||
Pool<String>.Entry acquired1 = pool.acquire();
|
||||
Pool<CloseableHolder>.Entry acquired1 = pool.acquire();
|
||||
assertThat(acquired1, notNullValue());
|
||||
assertThat(pool.release(acquired1), is(true));
|
||||
|
||||
assertThat(pool.size(), is(2));
|
||||
pool.setMaxUsageCount(1);
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(entry1.getPooled().closed ^ entry2.getPooled().closed, is(true));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfigLimits()
|
||||
{
|
||||
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(FIRST, 1).setMaxMultiplex(0));
|
||||
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(FIRST, 1).setMaxMultiplex(-1));
|
||||
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(FIRST, 1).setMaxUsageCount(0));
|
||||
assertThrows(IllegalArgumentException.class, () -> new Pool<CloseableHolder>(FIRST, 1).setMaxMultiplex(0));
|
||||
assertThrows(IllegalArgumentException.class, () -> new Pool<CloseableHolder>(FIRST, 1).setMaxMultiplex(-1));
|
||||
assertThrows(IllegalArgumentException.class, () -> new Pool<CloseableHolder>(FIRST, 1).setMaxUsageCount(0));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "strategy")
|
||||
public void testAcquireWithCreator(Factory factory)
|
||||
{
|
||||
Pool<String> pool = factory.getPool(2);
|
||||
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||
|
||||
assertThat(pool.size(), is(0));
|
||||
assertThat(pool.acquire(e -> null), nullValue());
|
||||
assertThat(pool.size(), is(0));
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire(e -> "e1");
|
||||
assertThat(e1.getPooled(), is("e1"));
|
||||
Pool<CloseableHolder>.Entry e1 = pool.acquire(e -> new CloseableHolder("e1"));
|
||||
assertThat(e1.getPooled().value, is("e1"));
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.getReservedCount(), is(0));
|
||||
assertThat(pool.getInUseCount(), is(1));
|
||||
|
@ -638,13 +648,13 @@ public class PoolTest
|
|||
assertThat(pool.acquire(e -> null), nullValue());
|
||||
assertThat(pool.size(), is(1));
|
||||
|
||||
Pool<String>.Entry e2 = pool.acquire(e -> "e2");
|
||||
assertThat(e2.getPooled(), is("e2"));
|
||||
Pool<CloseableHolder>.Entry e2 = pool.acquire(e -> new CloseableHolder("e2"));
|
||||
assertThat(e2.getPooled().value, is("e2"));
|
||||
assertThat(pool.size(), is(2));
|
||||
assertThat(pool.getReservedCount(), is(0));
|
||||
assertThat(pool.getInUseCount(), is(2));
|
||||
|
||||
Pool<String>.Entry e3 = pool.acquire(e -> "e3");
|
||||
Pool<CloseableHolder>.Entry e3 = pool.acquire(e -> new CloseableHolder("e3"));
|
||||
assertThat(e3, nullValue());
|
||||
assertThat(pool.size(), is(2));
|
||||
assertThat(pool.getReservedCount(), is(0));
|
||||
|
@ -660,8 +670,8 @@ public class PoolTest
|
|||
assertThat(pool.getReservedCount(), is(0));
|
||||
assertThat(pool.getInUseCount(), is(1));
|
||||
|
||||
Pool<String>.Entry e4 = pool.acquire(e -> "e4");
|
||||
assertThat(e4.getPooled(), is("e2"));
|
||||
Pool<CloseableHolder>.Entry e4 = pool.acquire(e -> new CloseableHolder("e4"));
|
||||
assertThat(e4.getPooled().value, is("e2"));
|
||||
assertThat(pool.size(), is(2));
|
||||
assertThat(pool.getReservedCount(), is(0));
|
||||
assertThat(pool.getInUseCount(), is(2));
|
||||
|
|
|
@ -24,7 +24,7 @@ import java.util.Set;
|
|||
import org.eclipse.jetty.util.annotation.Name;
|
||||
import org.junit.jupiter.api.Disabled;
|
||||
|
||||
@Disabled
|
||||
@Disabled("Not a test case")
|
||||
public class TestConfiguration extends HashMap<String, Object>
|
||||
{
|
||||
public static int VALUE = 77;
|
||||
|
|
Loading…
Reference in New Issue