implement connection pool max duration
This commit is contained in:
parent
03ae75f407
commit
0dec883fda
|
@ -23,8 +23,10 @@ import java.util.ArrayDeque;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Objects;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.concurrent.CompletableFuture;
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
@ -32,6 +34,7 @@ import org.eclipse.jetty.client.api.Connection;
|
||||||
import org.eclipse.jetty.client.api.Destination;
|
import org.eclipse.jetty.client.api.Destination;
|
||||||
import org.eclipse.jetty.util.Attachable;
|
import org.eclipse.jetty.util.Attachable;
|
||||||
import org.eclipse.jetty.util.Callback;
|
import org.eclipse.jetty.util.Callback;
|
||||||
|
import org.eclipse.jetty.util.IO;
|
||||||
import org.eclipse.jetty.util.Pool;
|
import org.eclipse.jetty.util.Pool;
|
||||||
import org.eclipse.jetty.util.Promise;
|
import org.eclipse.jetty.util.Promise;
|
||||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||||
|
@ -54,6 +57,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
||||||
private final Callback requester;
|
private final Callback requester;
|
||||||
private final Pool<Connection> pool;
|
private final Pool<Connection> pool;
|
||||||
private boolean maximizeConnections;
|
private boolean maximizeConnections;
|
||||||
|
private volatile long maxDurationNanos = 0L;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @deprecated use {@link #AbstractConnectionPool(HttpDestination, int, boolean, Callback)} instead
|
* @deprecated use {@link #AbstractConnectionPool(HttpDestination, int, boolean, Callback)} instead
|
||||||
|
@ -105,6 +109,27 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
||||||
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
|
return CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>Get the max usage duration in milliseconds of the pool's connections.
|
||||||
|
* Values {@code 0} and negative mean that there is no limit.</p>
|
||||||
|
* <p>This only guarantees that a connection cannot be acquired after the configured
|
||||||
|
* duration elapses, so that is only enforced when {@link #acquire()} is called.
|
||||||
|
* If a pool stays completely idle for a duration longer than the value
|
||||||
|
* returned by this method, the max duration will not be enforced.
|
||||||
|
* It's up to the idle timeout mechanism (see {@link HttpClient#getIdleTimeout()})
|
||||||
|
* to handle closing idle connections.</p>
|
||||||
|
*/
|
||||||
|
@ManagedAttribute(value = "The maximum duration in milliseconds a connection can be used for before it gets closed")
|
||||||
|
public long getMaxDuration()
|
||||||
|
{
|
||||||
|
return TimeUnit.NANOSECONDS.toMillis(maxDurationNanos);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxDuration(long maxDurationInMs)
|
||||||
|
{
|
||||||
|
this.maxDurationNanos = TimeUnit.MILLISECONDS.toNanos(maxDurationInMs);
|
||||||
|
}
|
||||||
|
|
||||||
protected int getMaxMultiplex()
|
protected int getMaxMultiplex()
|
||||||
{
|
{
|
||||||
return pool.getMaxMultiplex();
|
return pool.getMaxMultiplex();
|
||||||
|
@ -290,16 +315,35 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
||||||
|
|
||||||
protected Connection activate()
|
protected Connection activate()
|
||||||
{
|
{
|
||||||
Pool<Connection>.Entry entry = pool.acquire();
|
while (true)
|
||||||
if (entry != null)
|
|
||||||
{
|
{
|
||||||
if (LOG.isDebugEnabled())
|
Pool<Connection>.Entry entry = pool.acquire();
|
||||||
LOG.debug("Activated {} {}", entry, pool);
|
if (entry != null)
|
||||||
Connection connection = entry.getPooled();
|
{
|
||||||
acquired(connection);
|
Connection connection = entry.getPooled();
|
||||||
return connection;
|
|
||||||
|
long maxDurationNanos = this.maxDurationNanos;
|
||||||
|
if (maxDurationNanos > 0L)
|
||||||
|
{
|
||||||
|
EntryHolder holder = (EntryHolder)((Attachable)connection).getAttachment();
|
||||||
|
if (holder.isExpired(maxDurationNanos))
|
||||||
|
{
|
||||||
|
boolean canClose = remove(connection, true);
|
||||||
|
if (canClose)
|
||||||
|
IO.close(connection);
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("Connection removed{} due to expiration {} {}", (canClose ? " and closed" : ""), entry, pool);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("Activated {} {}", entry, pool);
|
||||||
|
acquired(connection);
|
||||||
|
return connection;
|
||||||
|
}
|
||||||
|
return null;
|
||||||
}
|
}
|
||||||
return null;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -308,11 +352,10 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
||||||
if (!(connection instanceof Attachable))
|
if (!(connection instanceof Attachable))
|
||||||
throw new IllegalArgumentException("Invalid connection object: " + connection);
|
throw new IllegalArgumentException("Invalid connection object: " + connection);
|
||||||
Attachable attachable = (Attachable)connection;
|
Attachable attachable = (Attachable)connection;
|
||||||
@SuppressWarnings("unchecked")
|
EntryHolder holder = (EntryHolder)attachable.getAttachment();
|
||||||
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
|
if (holder == null)
|
||||||
if (entry == null)
|
|
||||||
return false;
|
return false;
|
||||||
return !entry.isIdle();
|
return !holder.entry.isIdle();
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -329,13 +372,12 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
||||||
if (!(connection instanceof Attachable))
|
if (!(connection instanceof Attachable))
|
||||||
throw new IllegalArgumentException("Invalid connection object: " + connection);
|
throw new IllegalArgumentException("Invalid connection object: " + connection);
|
||||||
Attachable attachable = (Attachable)connection;
|
Attachable attachable = (Attachable)connection;
|
||||||
@SuppressWarnings("unchecked")
|
EntryHolder holder = (EntryHolder)attachable.getAttachment();
|
||||||
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
|
if (holder == null)
|
||||||
if (entry == null)
|
|
||||||
return true;
|
return true;
|
||||||
boolean reusable = pool.release(entry);
|
boolean reusable = pool.release(holder.entry);
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Released ({}) {} {}", reusable, entry, pool);
|
LOG.debug("Released ({}) {} {}", reusable, holder.entry, pool);
|
||||||
if (reusable)
|
if (reusable)
|
||||||
return true;
|
return true;
|
||||||
remove(connection);
|
remove(connection);
|
||||||
|
@ -353,14 +395,14 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
||||||
if (!(connection instanceof Attachable))
|
if (!(connection instanceof Attachable))
|
||||||
throw new IllegalArgumentException("Invalid connection object: " + connection);
|
throw new IllegalArgumentException("Invalid connection object: " + connection);
|
||||||
Attachable attachable = (Attachable)connection;
|
Attachable attachable = (Attachable)connection;
|
||||||
@SuppressWarnings("unchecked")
|
EntryHolder holder = (EntryHolder)attachable.getAttachment();
|
||||||
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
|
if (holder == null)
|
||||||
if (entry == null)
|
|
||||||
return false;
|
return false;
|
||||||
attachable.setAttachment(null);
|
boolean removed = pool.remove(holder.entry);
|
||||||
boolean removed = pool.remove(entry);
|
if (removed)
|
||||||
|
attachable.setAttachment(null);
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Removed ({}) {} {}", removed, entry, pool);
|
LOG.debug("Removed ({}) {} {}", removed, holder.entry, pool);
|
||||||
if (removed || force)
|
if (removed || force)
|
||||||
{
|
{
|
||||||
released(connection);
|
released(connection);
|
||||||
|
@ -433,20 +475,22 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
||||||
@Override
|
@Override
|
||||||
public boolean sweep()
|
public boolean sweep()
|
||||||
{
|
{
|
||||||
pool.values().stream().filter(entry -> entry.getPooled() instanceof Sweeper.Sweepable).forEach(entry ->
|
pool.values().stream()
|
||||||
{
|
.map(Pool.Entry::getPooled)
|
||||||
Connection connection = entry.getPooled();
|
.filter(connection -> connection instanceof Sweeper.Sweepable)
|
||||||
if (((Sweeper.Sweepable)connection).sweep())
|
.forEach(connection ->
|
||||||
{
|
{
|
||||||
boolean removed = remove(connection);
|
if (((Sweeper.Sweepable)connection).sweep())
|
||||||
LOG.warn("Connection swept: {}{}{} from active connections{}{}",
|
{
|
||||||
connection,
|
boolean removed = remove(connection);
|
||||||
System.lineSeparator(),
|
LOG.warn("Connection swept: {}{}{} from active connections{}{}",
|
||||||
removed ? "Removed" : "Not removed",
|
connection,
|
||||||
System.lineSeparator(),
|
System.lineSeparator(),
|
||||||
dump());
|
removed ? "Removed" : "Not removed",
|
||||||
}
|
System.lineSeparator(),
|
||||||
});
|
dump());
|
||||||
|
}
|
||||||
|
});
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -480,7 +524,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
||||||
LOG.debug("Connection creation succeeded {}: {}", reserved, connection);
|
LOG.debug("Connection creation succeeded {}: {}", reserved, connection);
|
||||||
if (connection instanceof Attachable)
|
if (connection instanceof Attachable)
|
||||||
{
|
{
|
||||||
((Attachable)connection).setAttachment(reserved);
|
((Attachable)connection).setAttachment(new EntryHolder(reserved));
|
||||||
onCreated(connection);
|
onCreated(connection);
|
||||||
pending.decrementAndGet();
|
pending.decrementAndGet();
|
||||||
reserved.enable(connection, false);
|
reserved.enable(connection, false);
|
||||||
|
@ -507,4 +551,20 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
||||||
requester.failed(x);
|
requester.failed(x);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private static class EntryHolder
|
||||||
|
{
|
||||||
|
private final Pool<Connection>.Entry entry;
|
||||||
|
private final long creationTimestamp = System.nanoTime();
|
||||||
|
|
||||||
|
private EntryHolder(Pool<Connection>.Entry entry)
|
||||||
|
{
|
||||||
|
this.entry = Objects.requireNonNull(entry);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isExpired(long timeoutNanos)
|
||||||
|
{
|
||||||
|
return System.nanoTime() - creationTimestamp >= timeoutNanos;
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -26,12 +26,14 @@ import java.util.concurrent.CountDownLatch;
|
||||||
import java.util.concurrent.CyclicBarrier;
|
import java.util.concurrent.CyclicBarrier;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
import javax.servlet.ServletException;
|
import javax.servlet.ServletException;
|
||||||
import javax.servlet.http.HttpServletRequest;
|
import javax.servlet.http.HttpServletRequest;
|
||||||
import javax.servlet.http.HttpServletResponse;
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.client.api.Connection;
|
||||||
import org.eclipse.jetty.client.api.ContentResponse;
|
import org.eclipse.jetty.client.api.ContentResponse;
|
||||||
import org.eclipse.jetty.client.api.Destination;
|
import org.eclipse.jetty.client.api.Destination;
|
||||||
import org.eclipse.jetty.client.api.Request;
|
import org.eclipse.jetty.client.api.Request;
|
||||||
|
@ -50,6 +52,7 @@ import org.eclipse.jetty.util.SocketAddressResolver;
|
||||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
import org.junit.jupiter.api.AfterEach;
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.MethodSource;
|
import org.junit.jupiter.params.provider.MethodSource;
|
||||||
|
|
||||||
|
@ -73,6 +76,12 @@ public class ConnectionPoolTest
|
||||||
{
|
{
|
||||||
return Stream.of(
|
return Stream.of(
|
||||||
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
|
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
|
||||||
|
new ConnectionPoolFactory("duplex-maxDuration", destination ->
|
||||||
|
{
|
||||||
|
DuplexConnectionPool pool = new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination);
|
||||||
|
pool.setMaxDuration(10);
|
||||||
|
return pool;
|
||||||
|
}),
|
||||||
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)),
|
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)),
|
||||||
new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
|
new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
|
||||||
);
|
);
|
||||||
|
@ -432,6 +441,116 @@ public class ConnectionPoolTest
|
||||||
assertThat(connectionPool.getConnectionCount(), Matchers.greaterThanOrEqualTo(count));
|
assertThat(connectionPool.getConnectionCount(), Matchers.greaterThanOrEqualTo(count));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMaxDurationConnectionsWithConstrainedPool() throws Exception
|
||||||
|
{
|
||||||
|
// ConnectionPool may NOT open more connections than expected because
|
||||||
|
// it is constrained to a single connection in this test.
|
||||||
|
|
||||||
|
final int maxConnections = 1;
|
||||||
|
final int maxDuration = 30;
|
||||||
|
AtomicInteger poolCreateCounter = new AtomicInteger();
|
||||||
|
AtomicInteger poolRemoveCounter = new AtomicInteger();
|
||||||
|
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
|
||||||
|
{
|
||||||
|
// Constrain the max pool size to 1.
|
||||||
|
DuplexConnectionPool pool = new DuplexConnectionPool(destination, maxConnections, destination)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected void onCreated(Connection connection)
|
||||||
|
{
|
||||||
|
poolCreateCounter.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void removed(Connection connection)
|
||||||
|
{
|
||||||
|
poolRemoveCounter.incrementAndGet();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
pool.setMaxDuration(maxDuration);
|
||||||
|
return pool;
|
||||||
|
});
|
||||||
|
|
||||||
|
startServer(new EmptyServerHandler());
|
||||||
|
|
||||||
|
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
|
||||||
|
transport.setConnectionPoolFactory(factory.factory);
|
||||||
|
client = new HttpClient(transport, null);
|
||||||
|
client.start();
|
||||||
|
|
||||||
|
// Use the connection pool 5 times with a delay that is longer than the max duration in between each time.
|
||||||
|
for (int i = 0; i < 5; i++)
|
||||||
|
{
|
||||||
|
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
|
||||||
|
.timeout(5, TimeUnit.SECONDS)
|
||||||
|
.send();
|
||||||
|
assertThat(response.getStatus(), Matchers.is(200));
|
||||||
|
|
||||||
|
Thread.sleep(maxDuration * 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the pool created 5 and removed 4 connections;
|
||||||
|
// it must be exactly 4 removed b/c each cycle of the loop
|
||||||
|
// can only open 1 connection as the pool is constrained to
|
||||||
|
// maximum 1 connection.
|
||||||
|
assertThat(poolCreateCounter.get(), Matchers.is(5));
|
||||||
|
assertThat(poolRemoveCounter.get(), Matchers.is(4));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMaxDurationConnectionsWithUnconstrainedPool() throws Exception
|
||||||
|
{
|
||||||
|
// ConnectionPools may open a few more connections than expected.
|
||||||
|
|
||||||
|
final int maxDuration = 30;
|
||||||
|
AtomicInteger poolCreateCounter = new AtomicInteger();
|
||||||
|
AtomicInteger poolRemoveCounter = new AtomicInteger();
|
||||||
|
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
|
||||||
|
{
|
||||||
|
DuplexConnectionPool pool = new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected void onCreated(Connection connection)
|
||||||
|
{
|
||||||
|
poolCreateCounter.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void removed(Connection connection)
|
||||||
|
{
|
||||||
|
poolRemoveCounter.incrementAndGet();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
pool.setMaxDuration(maxDuration);
|
||||||
|
return pool;
|
||||||
|
});
|
||||||
|
|
||||||
|
startServer(new EmptyServerHandler());
|
||||||
|
|
||||||
|
HttpClientTransport transport = new HttpClientTransportOverHTTP(1);
|
||||||
|
transport.setConnectionPoolFactory(factory.factory);
|
||||||
|
client = new HttpClient(transport, null);
|
||||||
|
client.start();
|
||||||
|
|
||||||
|
// Use the connection pool 5 times with a delay that is longer than the max duration in between each time.
|
||||||
|
for (int i = 0; i < 5; i++)
|
||||||
|
{
|
||||||
|
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
|
||||||
|
.timeout(5, TimeUnit.SECONDS)
|
||||||
|
.send();
|
||||||
|
assertThat(response.getStatus(), Matchers.is(200));
|
||||||
|
|
||||||
|
Thread.sleep(maxDuration * 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the pool created 5 and removed at least 4 connections;
|
||||||
|
// it can be more than 4 removed b/c each cycle of the loop may
|
||||||
|
// open more than 1 connection as the pool is not constrained.
|
||||||
|
assertThat(poolCreateCounter.get(), Matchers.is(5));
|
||||||
|
assertThat(poolRemoveCounter.get(), Matchers.greaterThanOrEqualTo(4));
|
||||||
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource("pools")
|
@MethodSource("pools")
|
||||||
public void testConnectionMaxUsage(ConnectionPoolFactory factory) throws Exception
|
public void testConnectionMaxUsage(ConnectionPoolFactory factory) throws Exception
|
||||||
|
|
|
@ -0,0 +1,309 @@
|
||||||
|
//
|
||||||
|
// ========================================================================
|
||||||
|
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||||
|
// ------------------------------------------------------------------------
|
||||||
|
// All rights reserved. This program and the accompanying materials
|
||||||
|
// are made available under the terms of the Eclipse Public License v1.0
|
||||||
|
// and Apache License v2.0 which accompanies this distribution.
|
||||||
|
//
|
||||||
|
// The Eclipse Public License is available at
|
||||||
|
// http://www.eclipse.org/legal/epl-v10.html
|
||||||
|
//
|
||||||
|
// The Apache License v2.0 is available at
|
||||||
|
// http://www.opensource.org/licenses/apache2.0.php
|
||||||
|
//
|
||||||
|
// You may elect to redistribute this code under either of these licenses.
|
||||||
|
// ========================================================================
|
||||||
|
//
|
||||||
|
|
||||||
|
package org.eclipse.jetty.http2.client.http;
|
||||||
|
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
import javax.servlet.ServletException;
|
||||||
|
import javax.servlet.http.HttpServletRequest;
|
||||||
|
import javax.servlet.http.HttpServletResponse;
|
||||||
|
|
||||||
|
import org.eclipse.jetty.client.ConnectionPool;
|
||||||
|
import org.eclipse.jetty.client.HttpClient;
|
||||||
|
import org.eclipse.jetty.client.HttpClientTransport;
|
||||||
|
import org.eclipse.jetty.client.MultiplexConnectionPool;
|
||||||
|
import org.eclipse.jetty.client.api.Connection;
|
||||||
|
import org.eclipse.jetty.client.api.ContentResponse;
|
||||||
|
import org.eclipse.jetty.http2.client.HTTP2Client;
|
||||||
|
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
|
||||||
|
import org.eclipse.jetty.server.Handler;
|
||||||
|
import org.eclipse.jetty.server.HttpConfiguration;
|
||||||
|
import org.eclipse.jetty.server.Request;
|
||||||
|
import org.eclipse.jetty.server.Server;
|
||||||
|
import org.eclipse.jetty.server.ServerConnector;
|
||||||
|
import org.eclipse.jetty.util.Pool;
|
||||||
|
import org.hamcrest.Matchers;
|
||||||
|
import org.junit.jupiter.api.AfterEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
|
import static org.hamcrest.Matchers.is;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
// Sibling of ConnectionPoolTest, but using H2 to multiplex connections.
|
||||||
|
public class MultiplexedConnectionPoolTest
|
||||||
|
{
|
||||||
|
private static final int MAX_MULTIPLEX = 2;
|
||||||
|
|
||||||
|
private Server server;
|
||||||
|
private ServerConnector connector;
|
||||||
|
private HttpClient client;
|
||||||
|
|
||||||
|
private void startServer(Handler handler) throws Exception
|
||||||
|
{
|
||||||
|
server = new Server();
|
||||||
|
HTTP2ServerConnectionFactory http2ServerConnectionFactory = new HTTP2ServerConnectionFactory(new HttpConfiguration());
|
||||||
|
http2ServerConnectionFactory.setMaxConcurrentStreams(MAX_MULTIPLEX);
|
||||||
|
connector = new ServerConnector(server, 1, 1, http2ServerConnectionFactory);
|
||||||
|
server.addConnector(connector);
|
||||||
|
server.setHandler(handler);
|
||||||
|
server.start();
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
public void disposeServer() throws Exception
|
||||||
|
{
|
||||||
|
connector = null;
|
||||||
|
if (server != null)
|
||||||
|
{
|
||||||
|
server.stop();
|
||||||
|
server = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@AfterEach
|
||||||
|
public void disposeClient() throws Exception
|
||||||
|
{
|
||||||
|
if (client != null)
|
||||||
|
{
|
||||||
|
client.stop();
|
||||||
|
client = null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception
|
||||||
|
{
|
||||||
|
final int maxDuration = 30;
|
||||||
|
AtomicInteger poolCreateCounter = new AtomicInteger();
|
||||||
|
AtomicInteger poolRemoveCounter = new AtomicInteger();
|
||||||
|
AtomicReference<Pool<Connection>> poolRef = new AtomicReference<>();
|
||||||
|
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
|
||||||
|
{
|
||||||
|
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
|
||||||
|
Pool<Connection> pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false);
|
||||||
|
poolRef.set(pool);
|
||||||
|
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected void onCreated(Connection connection)
|
||||||
|
{
|
||||||
|
poolCreateCounter.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void removed(Connection connection)
|
||||||
|
{
|
||||||
|
poolRemoveCounter.incrementAndGet();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
connectionPool.setMaxDuration(maxDuration);
|
||||||
|
return connectionPool;
|
||||||
|
});
|
||||||
|
|
||||||
|
startServer(new EmptyServerHandler());
|
||||||
|
|
||||||
|
HttpClientTransport transport = new HttpClientTransportOverHTTP2(new HTTP2Client());
|
||||||
|
transport.setConnectionPoolFactory(factory.factory);
|
||||||
|
client = new HttpClient(transport, null);
|
||||||
|
client.start();
|
||||||
|
|
||||||
|
// Use the connection pool 5 times with a delay that is longer than the max duration in between each time.
|
||||||
|
for (int i = 0; i < 5; i++)
|
||||||
|
{
|
||||||
|
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
|
||||||
|
.timeout(5, TimeUnit.SECONDS)
|
||||||
|
.send();
|
||||||
|
assertThat(response.getStatus(), Matchers.is(200));
|
||||||
|
|
||||||
|
// Check that the pool never grows above 1.
|
||||||
|
assertThat(poolRef.get().size(), is(1));
|
||||||
|
|
||||||
|
Thread.sleep(maxDuration * 2);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check that the pool created 5 and removed 4 connections;
|
||||||
|
// it must be exactly 4 removed b/c while the pool is not
|
||||||
|
// constrained, it can multiplex requests on a single connection
|
||||||
|
// so that should prevent opening more connections than needed.
|
||||||
|
assertThat(poolCreateCounter.get(), is(5));
|
||||||
|
assertThat(poolRemoveCounter.get(), is(4));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMaxDurationConnectionsWithMultiplexedPoolClosesExpiredConnectionWhileStillInUse() throws Exception
|
||||||
|
{
|
||||||
|
final int maxDuration = 1000;
|
||||||
|
final int maxIdle = 2000;
|
||||||
|
|
||||||
|
AtomicInteger poolCreateCounter = new AtomicInteger();
|
||||||
|
AtomicInteger poolRemoveCounter = new AtomicInteger();
|
||||||
|
AtomicReference<Pool<Connection>> poolRef = new AtomicReference<>();
|
||||||
|
ConnectionPoolFactory factory = new ConnectionPoolFactory("duplex-maxDuration", destination ->
|
||||||
|
{
|
||||||
|
int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination();
|
||||||
|
Pool<Connection> pool = new Pool<>(Pool.StrategyType.FIRST, maxConnections, false);
|
||||||
|
poolRef.set(pool);
|
||||||
|
MultiplexConnectionPool connectionPool = new MultiplexConnectionPool(destination, pool, destination, MAX_MULTIPLEX)
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected void onCreated(Connection connection)
|
||||||
|
{
|
||||||
|
poolCreateCounter.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected void removed(Connection connection)
|
||||||
|
{
|
||||||
|
poolRemoveCounter.incrementAndGet();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
connectionPool.setMaxDuration(maxDuration);
|
||||||
|
return connectionPool;
|
||||||
|
});
|
||||||
|
|
||||||
|
Semaphore handlerSignalingSemaphore = new Semaphore(0);
|
||||||
|
Semaphore handlerWaitingSemaphore = new Semaphore(0);
|
||||||
|
startServer(new EmptyServerHandler()
|
||||||
|
{
|
||||||
|
@Override
|
||||||
|
protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException
|
||||||
|
{
|
||||||
|
if (!target.equals("/block"))
|
||||||
|
return;
|
||||||
|
|
||||||
|
handlerSignalingSemaphore.release();
|
||||||
|
|
||||||
|
try
|
||||||
|
{
|
||||||
|
handlerWaitingSemaphore.acquire();
|
||||||
|
}
|
||||||
|
catch (Exception e)
|
||||||
|
{
|
||||||
|
throw new ServletException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
HttpClientTransport transport = new HttpClientTransportOverHTTP2(new HTTP2Client());
|
||||||
|
transport.setConnectionPoolFactory(factory.factory);
|
||||||
|
client = new HttpClient(transport, null);
|
||||||
|
client.setIdleTimeout(maxIdle);
|
||||||
|
client.start();
|
||||||
|
|
||||||
|
CountDownLatch latch1 = new CountDownLatch(1);
|
||||||
|
CountDownLatch latch2 = new CountDownLatch(2);
|
||||||
|
// create 2 requests that are going to consume all the multiplexing slots
|
||||||
|
client.newRequest("localhost", connector.getLocalPort())
|
||||||
|
.path("/block")
|
||||||
|
.timeout(5, TimeUnit.SECONDS)
|
||||||
|
.send(result ->
|
||||||
|
{
|
||||||
|
if (result.isSucceeded())
|
||||||
|
{
|
||||||
|
latch1.countDown();
|
||||||
|
latch2.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// wait for the 1st request to be serviced to make sure only 1 connection gets created
|
||||||
|
handlerSignalingSemaphore.acquire();
|
||||||
|
|
||||||
|
client.newRequest("localhost", connector.getLocalPort())
|
||||||
|
.path("/block")
|
||||||
|
.timeout(5, TimeUnit.SECONDS)
|
||||||
|
.send(result ->
|
||||||
|
{
|
||||||
|
if (result.isSucceeded())
|
||||||
|
{
|
||||||
|
latch1.countDown();
|
||||||
|
latch2.countDown();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// wait for both requests to start being serviced
|
||||||
|
handlerSignalingSemaphore.acquire();
|
||||||
|
|
||||||
|
assertThat(poolCreateCounter.get(), is(1));
|
||||||
|
|
||||||
|
// finalize 1 request, freeing up 1 multiplexing slot
|
||||||
|
handlerWaitingSemaphore.release();
|
||||||
|
// wait until 1st request finished
|
||||||
|
assertTrue(latch1.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
assertThat(poolRef.get().getInUseCount(), is(1));
|
||||||
|
assertThat(poolRef.get().getIdleCount(), is(0));
|
||||||
|
assertThat(poolRef.get().getClosedCount(), is(0));
|
||||||
|
assertThat(poolRef.get().size(), is(1));
|
||||||
|
|
||||||
|
// wait for the connection to expire
|
||||||
|
Thread.sleep(maxDuration + 500);
|
||||||
|
|
||||||
|
// send a 3rd request that will close the expired multiplexed connection
|
||||||
|
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
|
||||||
|
.path("/do-not-block")
|
||||||
|
.timeout(5, TimeUnit.SECONDS)
|
||||||
|
.send();
|
||||||
|
assertThat(response.getStatus(), is(200));
|
||||||
|
|
||||||
|
assertThat(poolRef.get().getInUseCount(), is(0));
|
||||||
|
assertThat(poolRef.get().getIdleCount(), is(1));
|
||||||
|
assertThat(poolRef.get().getClosedCount(), is(1));
|
||||||
|
assertThat(poolRef.get().size(), is(2));
|
||||||
|
|
||||||
|
// unblock 2nd request
|
||||||
|
handlerWaitingSemaphore.release();
|
||||||
|
//wait until 2nd request finished
|
||||||
|
assertTrue(latch2.await(5, TimeUnit.SECONDS));
|
||||||
|
|
||||||
|
assertThat(poolRef.get().getInUseCount(), is(0));
|
||||||
|
assertThat(poolRef.get().getIdleCount(), is(1));
|
||||||
|
assertThat(poolRef.get().getClosedCount(), is(0));
|
||||||
|
assertThat(poolRef.get().size(), is(1));
|
||||||
|
assertThat(poolCreateCounter.get(), is(2));
|
||||||
|
|
||||||
|
// wait for idle connections to be closed
|
||||||
|
Thread.sleep(maxIdle + 500);
|
||||||
|
|
||||||
|
assertThat(poolRef.get().getIdleCount(), is(0));
|
||||||
|
assertThat(poolRef.get().size(), is(0));
|
||||||
|
assertThat(poolRemoveCounter.get(), is(3));
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class ConnectionPoolFactory
|
||||||
|
{
|
||||||
|
private final String name;
|
||||||
|
private final ConnectionPool.Factory factory;
|
||||||
|
|
||||||
|
private ConnectionPoolFactory(String name, ConnectionPool.Factory factory)
|
||||||
|
{
|
||||||
|
this.name = name;
|
||||||
|
this.factory = factory;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString()
|
||||||
|
{
|
||||||
|
return name;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
|
@ -149,6 +149,11 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
||||||
return (int)entries.stream().filter(Entry::isInUse).count();
|
return (int)entries.stream().filter(Entry::isInUse).count();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public int getClosedCount()
|
||||||
|
{
|
||||||
|
return (int)entries.stream().filter(Entry::isClosed).count();
|
||||||
|
}
|
||||||
|
|
||||||
public int getMaxEntries()
|
public int getMaxEntries()
|
||||||
{
|
{
|
||||||
return maxEntries;
|
return maxEntries;
|
||||||
|
@ -627,8 +632,9 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Try to mark the entry as removed.
|
* Try to remove the entry by marking it as closed and decrementing the multiplexing counter.
|
||||||
* @return true if the entry has to be removed from the containing pool, false otherwise.
|
* The multiplexing counter will never go below zero and if it reaches zero, the entry is considered removed.
|
||||||
|
* @return true if the entry can be removed from the containing pool, false otherwise.
|
||||||
*/
|
*/
|
||||||
boolean tryRemove()
|
boolean tryRemove()
|
||||||
{
|
{
|
||||||
|
|
|
@ -24,7 +24,6 @@ import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
import java.util.stream.Stream;
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
@ -52,7 +51,24 @@ public class PoolTest
|
||||||
{
|
{
|
||||||
interface Factory
|
interface Factory
|
||||||
{
|
{
|
||||||
Pool<String> getPool(int maxSize);
|
Pool<CloseableHolder> getPool(int maxSize);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static class CloseableHolder implements Closeable
|
||||||
|
{
|
||||||
|
private boolean closed;
|
||||||
|
private final String value;
|
||||||
|
|
||||||
|
public CloseableHolder(String value)
|
||||||
|
{
|
||||||
|
this.value = value;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void close()
|
||||||
|
{
|
||||||
|
closed = true;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static Stream<Object[]> strategy()
|
public static Stream<Object[]> strategy()
|
||||||
|
@ -69,15 +85,15 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testAcquireRelease(Factory factory)
|
public void testAcquireRelease(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(1);
|
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||||
pool.reserve().enable("aaa", false);
|
pool.reserve(-1).enable(new CloseableHolder("aaa"), false);
|
||||||
assertThat(pool.size(), is(1));
|
assertThat(pool.size(), is(1));
|
||||||
assertThat(pool.getReservedCount(), is(0));
|
assertThat(pool.getReservedCount(), is(0));
|
||||||
assertThat(pool.getIdleCount(), is(1));
|
assertThat(pool.getIdleCount(), is(1));
|
||||||
assertThat(pool.getInUseCount(), is(0));
|
assertThat(pool.getInUseCount(), is(0));
|
||||||
|
|
||||||
Pool<String>.Entry e1 = pool.acquire();
|
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||||
assertThat(e1.getPooled(), equalTo("aaa"));
|
assertThat(e1.getPooled().value, equalTo("aaa"));
|
||||||
assertThat(pool.size(), is(1));
|
assertThat(pool.size(), is(1));
|
||||||
assertThat(pool.getReservedCount(), is(0));
|
assertThat(pool.getReservedCount(), is(0));
|
||||||
assertThat(pool.getIdleCount(), is(0));
|
assertThat(pool.getIdleCount(), is(0));
|
||||||
|
@ -93,8 +109,8 @@ public class PoolTest
|
||||||
|
|
||||||
assertThrows(IllegalStateException.class, e1::release);
|
assertThrows(IllegalStateException.class, e1::release);
|
||||||
|
|
||||||
Pool<String>.Entry e2 = pool.acquire();
|
Pool<CloseableHolder>.Entry e2 = pool.acquire();
|
||||||
assertThat(e2.getPooled(), equalTo("aaa"));
|
assertThat(e2.getPooled().value, equalTo("aaa"));
|
||||||
assertThat(pool.size(), is(1));
|
assertThat(pool.size(), is(1));
|
||||||
assertThat(pool.getReservedCount(), is(0));
|
assertThat(pool.getReservedCount(), is(0));
|
||||||
assertThat(pool.getIdleCount(), is(0));
|
assertThat(pool.getIdleCount(), is(0));
|
||||||
|
@ -113,10 +129,10 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testRemoveBeforeRelease(Factory factory)
|
public void testRemoveBeforeRelease(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(1);
|
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||||
pool.reserve().enable("aaa", false);
|
pool.reserve(-1).enable(new CloseableHolder("aaa"), false);
|
||||||
|
|
||||||
Pool<String>.Entry e1 = pool.acquire();
|
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||||
assertThat(pool.remove(e1), is(true));
|
assertThat(pool.remove(e1), is(true));
|
||||||
assertThat(pool.remove(e1), is(false));
|
assertThat(pool.remove(e1), is(false));
|
||||||
assertThat(pool.release(e1), is(false));
|
assertThat(pool.release(e1), is(false));
|
||||||
|
@ -126,21 +142,22 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testCloseBeforeRelease(Factory factory)
|
public void testCloseBeforeRelease(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(1);
|
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||||
pool.reserve().enable("aaa", false);
|
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||||
|
|
||||||
Pool<String>.Entry e1 = pool.acquire();
|
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||||
assertThat(pool.size(), is(1));
|
assertThat(pool.size(), is(1));
|
||||||
pool.close();
|
pool.close();
|
||||||
assertThat(pool.size(), is(0));
|
assertThat(pool.size(), is(0));
|
||||||
assertThat(pool.release(e1), is(false));
|
assertThat(pool.release(e1), is(false));
|
||||||
|
assertThat(e1.getPooled().closed, is(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testMaxPoolSize(Factory factory)
|
public void testMaxPoolSize(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(1);
|
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||||
assertThat(pool.size(), is(0));
|
assertThat(pool.size(), is(0));
|
||||||
assertThat(pool.reserve(), notNullValue());
|
assertThat(pool.reserve(), notNullValue());
|
||||||
assertThat(pool.size(), is(1));
|
assertThat(pool.size(), is(1));
|
||||||
|
@ -152,25 +169,25 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testReserve(Factory factory)
|
public void testReserve(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(2);
|
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||||
pool.setMaxMultiplex(2);
|
pool.setMaxMultiplex(2);
|
||||||
|
|
||||||
// Reserve an entry
|
// Reserve an entry
|
||||||
Pool<String>.Entry e1 = pool.reserve();
|
Pool<CloseableHolder>.Entry e1 = pool.reserve();
|
||||||
assertThat(pool.size(), is(1));
|
assertThat(pool.size(), is(1));
|
||||||
assertThat(pool.getReservedCount(), is(1));
|
assertThat(pool.getReservedCount(), is(1));
|
||||||
assertThat(pool.getIdleCount(), is(0));
|
assertThat(pool.getIdleCount(), is(0));
|
||||||
assertThat(pool.getInUseCount(), is(0));
|
assertThat(pool.getInUseCount(), is(0));
|
||||||
|
|
||||||
// enable the entry
|
// enable the entry
|
||||||
e1.enable("aaa", false);
|
e1.enable(new CloseableHolder("aaa"), false);
|
||||||
assertThat(pool.size(), is(1));
|
assertThat(pool.size(), is(1));
|
||||||
assertThat(pool.getReservedCount(), is(0));
|
assertThat(pool.getReservedCount(), is(0));
|
||||||
assertThat(pool.getIdleCount(), is(1));
|
assertThat(pool.getIdleCount(), is(1));
|
||||||
assertThat(pool.getInUseCount(), is(0));
|
assertThat(pool.getInUseCount(), is(0));
|
||||||
|
|
||||||
// Reserve another entry
|
// Reserve another entry
|
||||||
Pool<String>.Entry e2 = pool.reserve();
|
Pool<CloseableHolder>.Entry e2 = pool.reserve();
|
||||||
assertThat(pool.size(), is(2));
|
assertThat(pool.size(), is(2));
|
||||||
assertThat(pool.getReservedCount(), is(1));
|
assertThat(pool.getReservedCount(), is(1));
|
||||||
assertThat(pool.getIdleCount(), is(1));
|
assertThat(pool.getIdleCount(), is(1));
|
||||||
|
@ -184,35 +201,35 @@ public class PoolTest
|
||||||
assertThat(pool.getInUseCount(), is(0));
|
assertThat(pool.getInUseCount(), is(0));
|
||||||
|
|
||||||
// Reserve another entry
|
// Reserve another entry
|
||||||
Pool<String>.Entry e3 = pool.reserve();
|
Pool<CloseableHolder>.Entry e3 = pool.reserve();
|
||||||
assertThat(pool.size(), is(2));
|
assertThat(pool.size(), is(2));
|
||||||
assertThat(pool.getReservedCount(), is(1));
|
assertThat(pool.getReservedCount(), is(1));
|
||||||
assertThat(pool.getIdleCount(), is(1));
|
assertThat(pool.getIdleCount(), is(1));
|
||||||
assertThat(pool.getInUseCount(), is(0));
|
assertThat(pool.getInUseCount(), is(0));
|
||||||
|
|
||||||
// enable and acquire the entry
|
// enable and acquire the entry
|
||||||
e3.enable("bbb", true);
|
e3.enable(new CloseableHolder("bbb"), true);
|
||||||
assertThat(pool.size(), is(2));
|
assertThat(pool.size(), is(2));
|
||||||
assertThat(pool.getReservedCount(), is(0));
|
assertThat(pool.getReservedCount(), is(0));
|
||||||
assertThat(pool.getIdleCount(), is(1));
|
assertThat(pool.getIdleCount(), is(1));
|
||||||
assertThat(pool.getInUseCount(), is(1));
|
assertThat(pool.getInUseCount(), is(1));
|
||||||
|
|
||||||
// can't reenable
|
// can't reenable
|
||||||
assertThrows(IllegalStateException.class, () -> e3.enable("xxx", false));
|
assertThrows(IllegalStateException.class, () -> e3.enable(new CloseableHolder("xxx"), false));
|
||||||
|
|
||||||
// Can't enable acquired entry
|
// Can't enable acquired entry
|
||||||
Pool<String>.Entry e = pool.acquire();
|
Pool<CloseableHolder>.Entry e = pool.acquire();
|
||||||
assertThrows(IllegalStateException.class, () -> e.enable("xxx", false));
|
assertThrows(IllegalStateException.class, () -> e.enable(new CloseableHolder("xxx"), false));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testDeprecatedReserve(Factory factory)
|
public void testDeprecatedReserve(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(2);
|
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||||
|
|
||||||
// Reserve an entry
|
// Reserve an entry
|
||||||
Pool<String>.Entry e1 = pool.reserve(-1);
|
Pool<CloseableHolder>.Entry e1 = pool.reserve(-1);
|
||||||
assertThat(pool.size(), is(1));
|
assertThat(pool.size(), is(1));
|
||||||
assertThat(pool.getReservedCount(), is(1));
|
assertThat(pool.getReservedCount(), is(1));
|
||||||
assertThat(pool.getIdleCount(), is(0));
|
assertThat(pool.getIdleCount(), is(0));
|
||||||
|
@ -226,14 +243,14 @@ public class PoolTest
|
||||||
assertThat(pool.getInUseCount(), is(0));
|
assertThat(pool.getInUseCount(), is(0));
|
||||||
|
|
||||||
// enable the entry
|
// enable the entry
|
||||||
e1.enable("aaa", false);
|
e1.enable(new CloseableHolder("aaa"), false);
|
||||||
assertThat(pool.size(), is(1));
|
assertThat(pool.size(), is(1));
|
||||||
assertThat(pool.getReservedCount(), is(0));
|
assertThat(pool.getReservedCount(), is(0));
|
||||||
assertThat(pool.getIdleCount(), is(1));
|
assertThat(pool.getIdleCount(), is(1));
|
||||||
assertThat(pool.getInUseCount(), is(0));
|
assertThat(pool.getInUseCount(), is(0));
|
||||||
|
|
||||||
// Reserve another entry
|
// Reserve another entry
|
||||||
Pool<String>.Entry e2 = pool.reserve(-1);
|
Pool<CloseableHolder>.Entry e2 = pool.reserve(-1);
|
||||||
assertThat(pool.size(), is(2));
|
assertThat(pool.size(), is(2));
|
||||||
assertThat(pool.getReservedCount(), is(1));
|
assertThat(pool.getReservedCount(), is(1));
|
||||||
assertThat(pool.getIdleCount(), is(1));
|
assertThat(pool.getIdleCount(), is(1));
|
||||||
|
@ -247,32 +264,32 @@ public class PoolTest
|
||||||
assertThat(pool.getInUseCount(), is(0));
|
assertThat(pool.getInUseCount(), is(0));
|
||||||
|
|
||||||
// Reserve another entry
|
// Reserve another entry
|
||||||
Pool<String>.Entry e3 = pool.reserve(-1);
|
Pool<CloseableHolder>.Entry e3 = pool.reserve(-1);
|
||||||
assertThat(pool.size(), is(2));
|
assertThat(pool.size(), is(2));
|
||||||
assertThat(pool.getReservedCount(), is(1));
|
assertThat(pool.getReservedCount(), is(1));
|
||||||
assertThat(pool.getIdleCount(), is(1));
|
assertThat(pool.getIdleCount(), is(1));
|
||||||
assertThat(pool.getInUseCount(), is(0));
|
assertThat(pool.getInUseCount(), is(0));
|
||||||
|
|
||||||
// enable and acquire the entry
|
// enable and acquire the entry
|
||||||
e3.enable("bbb", true);
|
e3.enable(new CloseableHolder("bbb"), true);
|
||||||
assertThat(pool.size(), is(2));
|
assertThat(pool.size(), is(2));
|
||||||
assertThat(pool.getReservedCount(), is(0));
|
assertThat(pool.getReservedCount(), is(0));
|
||||||
assertThat(pool.getIdleCount(), is(1));
|
assertThat(pool.getIdleCount(), is(1));
|
||||||
assertThat(pool.getInUseCount(), is(1));
|
assertThat(pool.getInUseCount(), is(1));
|
||||||
|
|
||||||
// can't reenable
|
// can't reenable
|
||||||
assertThrows(IllegalStateException.class, () -> e3.enable("xxx", false));
|
assertThrows(IllegalStateException.class, () -> e3.enable(new CloseableHolder("xxx"), false));
|
||||||
|
|
||||||
// Can't enable acquired entry
|
// Can't enable acquired entry
|
||||||
Pool<String>.Entry e = pool.acquire();
|
Pool<CloseableHolder>.Entry e = pool.acquire();
|
||||||
assertThrows(IllegalStateException.class, () -> e.enable("xxx", false));
|
assertThrows(IllegalStateException.class, () -> e.enable(new CloseableHolder("xxx"), false));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testReserveNegativeMaxPending(Factory factory)
|
public void testReserveNegativeMaxPending(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(2);
|
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||||
assertThat(pool.reserve(), notNullValue());
|
assertThat(pool.reserve(), notNullValue());
|
||||||
assertThat(pool.reserve(), notNullValue());
|
assertThat(pool.reserve(), notNullValue());
|
||||||
assertThat(pool.reserve(), nullValue());
|
assertThat(pool.reserve(), nullValue());
|
||||||
|
@ -282,8 +299,9 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testClose(Factory factory)
|
public void testClose(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(1);
|
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||||
pool.reserve().enable("aaa", false);
|
CloseableHolder holder = new CloseableHolder("aaa");
|
||||||
|
pool.reserve().enable(holder, false);
|
||||||
assertThat(pool.isClosed(), is(false));
|
assertThat(pool.isClosed(), is(false));
|
||||||
pool.close();
|
pool.close();
|
||||||
pool.close();
|
pool.close();
|
||||||
|
@ -292,28 +310,17 @@ public class PoolTest
|
||||||
assertThat(pool.size(), is(0));
|
assertThat(pool.size(), is(0));
|
||||||
assertThat(pool.acquire(), nullValue());
|
assertThat(pool.acquire(), nullValue());
|
||||||
assertThat(pool.reserve(), nullValue());
|
assertThat(pool.reserve(), nullValue());
|
||||||
}
|
assertThat(holder.closed, is(true));
|
||||||
|
|
||||||
@Test
|
|
||||||
public void testClosingCloseable()
|
|
||||||
{
|
|
||||||
AtomicBoolean closed = new AtomicBoolean();
|
|
||||||
Pool<Closeable> pool = new Pool<>(FIRST, 1);
|
|
||||||
Closeable pooled = () -> closed.set(true);
|
|
||||||
pool.reserve().enable(pooled, false);
|
|
||||||
assertThat(closed.get(), is(false));
|
|
||||||
pool.close();
|
|
||||||
assertThat(closed.get(), is(true));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testRemove(Factory factory)
|
public void testRemove(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(1);
|
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||||
pool.reserve().enable("aaa", false);
|
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||||
|
|
||||||
Pool<String>.Entry e1 = pool.acquire();
|
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||||
assertThat(pool.remove(e1), is(true));
|
assertThat(pool.remove(e1), is(true));
|
||||||
assertThat(pool.remove(e1), is(false));
|
assertThat(pool.remove(e1), is(false));
|
||||||
assertThat(pool.release(e1), is(false));
|
assertThat(pool.release(e1), is(false));
|
||||||
|
@ -325,13 +332,13 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testValuesSize(Factory factory)
|
public void testValuesSize(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(2);
|
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||||
|
|
||||||
assertThat(pool.size(), is(0));
|
assertThat(pool.size(), is(0));
|
||||||
assertThat(pool.values().isEmpty(), is(true));
|
assertThat(pool.values().isEmpty(), is(true));
|
||||||
pool.reserve().enable("aaa", false);
|
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||||
pool.reserve().enable("bbb", false);
|
pool.reserve().enable(new CloseableHolder("bbb"), false);
|
||||||
assertThat(pool.values().stream().map(Pool.Entry::getPooled).collect(toList()), equalTo(Arrays.asList("aaa", "bbb")));
|
assertThat(pool.values().stream().map(Pool.Entry::getPooled).map(closeableHolder -> closeableHolder.value).collect(toList()), equalTo(Arrays.asList("aaa", "bbb")));
|
||||||
assertThat(pool.size(), is(2));
|
assertThat(pool.size(), is(2));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -339,10 +346,10 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testValuesContainsAcquiredEntries(Factory factory)
|
public void testValuesContainsAcquiredEntries(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(2);
|
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||||
|
|
||||||
pool.reserve().enable("aaa", false);
|
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||||
pool.reserve().enable("bbb", false);
|
pool.reserve().enable(new CloseableHolder("bbb"), false);
|
||||||
assertThat(pool.acquire(), notNullValue());
|
assertThat(pool.acquire(), notNullValue());
|
||||||
assertThat(pool.acquire(), notNullValue());
|
assertThat(pool.acquire(), notNullValue());
|
||||||
assertThat(pool.acquire(), nullValue());
|
assertThat(pool.acquire(), nullValue());
|
||||||
|
@ -353,10 +360,10 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testAcquireAt(Factory factory)
|
public void testAcquireAt(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(2);
|
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||||
|
|
||||||
pool.reserve(-1).enable("aaa", false);
|
pool.reserve(-1).enable(new CloseableHolder("aaa"), false);
|
||||||
pool.reserve(-1).enable("bbb", false);
|
pool.reserve(-1).enable(new CloseableHolder("bbb"), false);
|
||||||
|
|
||||||
assertThat(pool.acquireAt(2), nullValue());
|
assertThat(pool.acquireAt(2), nullValue());
|
||||||
assertThat(pool.acquireAt(0), notNullValue());
|
assertThat(pool.acquireAt(0), notNullValue());
|
||||||
|
@ -369,11 +376,11 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testMaxUsageCount(Factory factory)
|
public void testMaxUsageCount(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(1);
|
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||||
pool.setMaxUsageCount(3);
|
pool.setMaxUsageCount(3);
|
||||||
pool.reserve().enable("aaa", false);
|
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||||
|
|
||||||
Pool<String>.Entry e1 = pool.acquire();
|
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||||
assertThat(pool.release(e1), is(true));
|
assertThat(pool.release(e1), is(true));
|
||||||
e1 = pool.acquire();
|
e1 = pool.acquire();
|
||||||
assertThat(pool.release(e1), is(true));
|
assertThat(pool.release(e1), is(true));
|
||||||
|
@ -384,7 +391,7 @@ public class PoolTest
|
||||||
assertThat(pool.remove(e1), is(true));
|
assertThat(pool.remove(e1), is(true));
|
||||||
assertThat(pool.remove(e1), is(false));
|
assertThat(pool.remove(e1), is(false));
|
||||||
assertThat(pool.size(), is(0));
|
assertThat(pool.size(), is(0));
|
||||||
Pool<String>.Entry e1Copy = e1;
|
Pool<CloseableHolder>.Entry e1Copy = e1;
|
||||||
assertThat(pool.release(e1Copy), is(false));
|
assertThat(pool.release(e1Copy), is(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -392,7 +399,7 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testMaxMultiplex(Factory factory)
|
public void testMaxMultiplex(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(2);
|
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||||
pool.setMaxMultiplex(3);
|
pool.setMaxMultiplex(3);
|
||||||
|
|
||||||
Map<String, AtomicInteger> counts = new HashMap<>();
|
Map<String, AtomicInteger> counts = new HashMap<>();
|
||||||
|
@ -400,21 +407,21 @@ public class PoolTest
|
||||||
AtomicInteger b = new AtomicInteger();
|
AtomicInteger b = new AtomicInteger();
|
||||||
counts.put("a", a);
|
counts.put("a", a);
|
||||||
counts.put("b", b);
|
counts.put("b", b);
|
||||||
pool.reserve().enable("a", false);
|
pool.reserve().enable(new CloseableHolder("a"), false);
|
||||||
pool.reserve().enable("b", false);
|
pool.reserve().enable(new CloseableHolder("b"), false);
|
||||||
|
|
||||||
counts.get(pool.acquire().getPooled()).incrementAndGet();
|
counts.get(pool.acquire().getPooled().value).incrementAndGet();
|
||||||
counts.get(pool.acquire().getPooled()).incrementAndGet();
|
counts.get(pool.acquire().getPooled().value).incrementAndGet();
|
||||||
counts.get(pool.acquire().getPooled()).incrementAndGet();
|
counts.get(pool.acquire().getPooled().value).incrementAndGet();
|
||||||
counts.get(pool.acquire().getPooled()).incrementAndGet();
|
counts.get(pool.acquire().getPooled().value).incrementAndGet();
|
||||||
|
|
||||||
assertThat(a.get(), greaterThan(0));
|
assertThat(a.get(), greaterThan(0));
|
||||||
assertThat(a.get(), lessThanOrEqualTo(3));
|
assertThat(a.get(), lessThanOrEqualTo(3));
|
||||||
assertThat(b.get(), greaterThan(0));
|
assertThat(b.get(), greaterThan(0));
|
||||||
assertThat(b.get(), lessThanOrEqualTo(3));
|
assertThat(b.get(), lessThanOrEqualTo(3));
|
||||||
|
|
||||||
counts.get(pool.acquire().getPooled()).incrementAndGet();
|
counts.get(pool.acquire().getPooled().value).incrementAndGet();
|
||||||
counts.get(pool.acquire().getPooled()).incrementAndGet();
|
counts.get(pool.acquire().getPooled().value).incrementAndGet();
|
||||||
|
|
||||||
assertThat(a.get(), is(3));
|
assertThat(a.get(), is(3));
|
||||||
assertThat(b.get(), is(3));
|
assertThat(b.get(), is(3));
|
||||||
|
@ -426,13 +433,13 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testRemoveMultiplexed(Factory factory)
|
public void testRemoveMultiplexed(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(1);
|
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||||
pool.setMaxMultiplex(2);
|
pool.setMaxMultiplex(2);
|
||||||
pool.reserve().enable("aaa", false);
|
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||||
|
|
||||||
Pool<String>.Entry e1 = pool.acquire();
|
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||||
assertThat(e1, notNullValue());
|
assertThat(e1, notNullValue());
|
||||||
Pool<String>.Entry e2 = pool.acquire();
|
Pool<CloseableHolder>.Entry e2 = pool.acquire();
|
||||||
assertThat(e2, notNullValue());
|
assertThat(e2, notNullValue());
|
||||||
assertThat(e2, sameInstance(e1));
|
assertThat(e2, sameInstance(e1));
|
||||||
assertThat(e2.getUsageCount(), is(2));
|
assertThat(e2.getUsageCount(), is(2));
|
||||||
|
@ -456,12 +463,12 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testMultiplexRemoveThenAcquireThenReleaseRemove(Factory factory)
|
public void testMultiplexRemoveThenAcquireThenReleaseRemove(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(1);
|
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||||
pool.setMaxMultiplex(2);
|
pool.setMaxMultiplex(2);
|
||||||
pool.reserve().enable("aaa", false);
|
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||||
|
|
||||||
Pool<String>.Entry e1 = pool.acquire();
|
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||||
Pool<String>.Entry e2 = pool.acquire();
|
Pool<CloseableHolder>.Entry e2 = pool.acquire();
|
||||||
|
|
||||||
assertThat(pool.remove(e1), is(false));
|
assertThat(pool.remove(e1), is(false));
|
||||||
assertThat(e1.isClosed(), is(true));
|
assertThat(e1.isClosed(), is(true));
|
||||||
|
@ -474,11 +481,11 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testNonMultiplexRemoveAfterAcquire(Factory factory)
|
public void testNonMultiplexRemoveAfterAcquire(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(1);
|
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||||
pool.setMaxMultiplex(2);
|
pool.setMaxMultiplex(2);
|
||||||
pool.reserve().enable("aaa", false);
|
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||||
|
|
||||||
Pool<String>.Entry e1 = pool.acquire();
|
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||||
assertThat(pool.remove(e1), is(true));
|
assertThat(pool.remove(e1), is(true));
|
||||||
assertThat(pool.size(), is(0));
|
assertThat(pool.size(), is(0));
|
||||||
}
|
}
|
||||||
|
@ -487,12 +494,12 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testMultiplexRemoveAfterAcquire(Factory factory)
|
public void testMultiplexRemoveAfterAcquire(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(1);
|
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||||
pool.setMaxMultiplex(2);
|
pool.setMaxMultiplex(2);
|
||||||
pool.reserve().enable("aaa", false);
|
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||||
|
|
||||||
Pool<String>.Entry e1 = pool.acquire();
|
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||||
Pool<String>.Entry e2 = pool.acquire();
|
Pool<CloseableHolder>.Entry e2 = pool.acquire();
|
||||||
|
|
||||||
assertThat(pool.remove(e1), is(false));
|
assertThat(pool.remove(e1), is(false));
|
||||||
assertThat(pool.remove(e2), is(true));
|
assertThat(pool.remove(e2), is(true));
|
||||||
|
@ -501,7 +508,7 @@ public class PoolTest
|
||||||
assertThat(pool.release(e1), is(false));
|
assertThat(pool.release(e1), is(false));
|
||||||
assertThat(pool.size(), is(0));
|
assertThat(pool.size(), is(0));
|
||||||
|
|
||||||
Pool<String>.Entry e3 = pool.acquire();
|
Pool<CloseableHolder>.Entry e3 = pool.acquire();
|
||||||
assertThat(e3, nullValue());
|
assertThat(e3, nullValue());
|
||||||
|
|
||||||
assertThat(pool.release(e2), is(false));
|
assertThat(pool.release(e2), is(false));
|
||||||
|
@ -512,8 +519,8 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testReleaseThenRemoveNonEnabledEntry(Factory factory)
|
public void testReleaseThenRemoveNonEnabledEntry(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(1);
|
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||||
Pool<String>.Entry e = pool.reserve();
|
Pool<CloseableHolder>.Entry e = pool.reserve();
|
||||||
assertThat(pool.size(), is(1));
|
assertThat(pool.size(), is(1));
|
||||||
assertThat(pool.release(e), is(false));
|
assertThat(pool.release(e), is(false));
|
||||||
assertThat(pool.size(), is(1));
|
assertThat(pool.size(), is(1));
|
||||||
|
@ -525,8 +532,8 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testRemoveNonEnabledEntry(Factory factory)
|
public void testRemoveNonEnabledEntry(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(1);
|
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||||
Pool<String>.Entry e = pool.reserve();
|
Pool<CloseableHolder>.Entry e = pool.reserve();
|
||||||
assertThat(pool.size(), is(1));
|
assertThat(pool.size(), is(1));
|
||||||
assertThat(pool.remove(e), is(true));
|
assertThat(pool.remove(e), is(true));
|
||||||
assertThat(pool.size(), is(0));
|
assertThat(pool.size(), is(0));
|
||||||
|
@ -536,16 +543,16 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testMultiplexMaxUsageReachedAcquireThenRemove(Factory factory)
|
public void testMultiplexMaxUsageReachedAcquireThenRemove(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(1);
|
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||||
pool.setMaxMultiplex(2);
|
pool.setMaxMultiplex(2);
|
||||||
pool.setMaxUsageCount(3);
|
pool.setMaxUsageCount(3);
|
||||||
pool.reserve().enable("aaa", false);
|
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||||
|
|
||||||
Pool<String>.Entry e0 = pool.acquire();
|
Pool<CloseableHolder>.Entry e0 = pool.acquire();
|
||||||
|
|
||||||
Pool<String>.Entry e1 = pool.acquire();
|
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||||
assertThat(pool.release(e1), is(true));
|
assertThat(pool.release(e1), is(true));
|
||||||
Pool<String>.Entry e2 = pool.acquire();
|
Pool<CloseableHolder>.Entry e2 = pool.acquire();
|
||||||
assertThat(pool.release(e2), is(true));
|
assertThat(pool.release(e2), is(true));
|
||||||
assertThat(pool.acquire(), nullValue());
|
assertThat(pool.acquire(), nullValue());
|
||||||
|
|
||||||
|
@ -557,16 +564,16 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove(Factory factory)
|
public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(1);
|
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||||
pool.setMaxMultiplex(2);
|
pool.setMaxMultiplex(2);
|
||||||
pool.setMaxUsageCount(3);
|
pool.setMaxUsageCount(3);
|
||||||
pool.reserve().enable("aaa", false);
|
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||||
|
|
||||||
Pool<String>.Entry e0 = pool.acquire();
|
Pool<CloseableHolder>.Entry e0 = pool.acquire();
|
||||||
|
|
||||||
Pool<String>.Entry e1 = pool.acquire();
|
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||||
assertThat(pool.release(e1), is(true));
|
assertThat(pool.release(e1), is(true));
|
||||||
Pool<String>.Entry e2 = pool.acquire();
|
Pool<CloseableHolder>.Entry e2 = pool.acquire();
|
||||||
assertThat(pool.release(e2), is(true));
|
assertThat(pool.release(e2), is(true));
|
||||||
assertThat(pool.acquire(), nullValue());
|
assertThat(pool.acquire(), nullValue());
|
||||||
|
|
||||||
|
@ -582,14 +589,14 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testUsageCountAfterReachingMaxMultiplexLimit(Factory factory)
|
public void testUsageCountAfterReachingMaxMultiplexLimit(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(1);
|
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||||
pool.setMaxMultiplex(2);
|
pool.setMaxMultiplex(2);
|
||||||
pool.setMaxUsageCount(10);
|
pool.setMaxUsageCount(10);
|
||||||
pool.reserve().enable("aaa", false);
|
pool.reserve().enable(new CloseableHolder("aaa"), false);
|
||||||
|
|
||||||
Pool<String>.Entry e1 = pool.acquire();
|
Pool<CloseableHolder>.Entry e1 = pool.acquire();
|
||||||
assertThat(e1.getUsageCount(), is(1));
|
assertThat(e1.getUsageCount(), is(1));
|
||||||
Pool<String>.Entry e2 = pool.acquire();
|
Pool<CloseableHolder>.Entry e2 = pool.acquire();
|
||||||
assertThat(e2, sameInstance(e1));
|
assertThat(e2, sameInstance(e1));
|
||||||
assertThat(e1.getUsageCount(), is(2));
|
assertThat(e1.getUsageCount(), is(2));
|
||||||
assertThat(pool.acquire(), nullValue());
|
assertThat(pool.acquire(), nullValue());
|
||||||
|
@ -600,58 +607,61 @@ public class PoolTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testDynamicMaxUsageCountChangeOverflowMaxInt(Factory factory)
|
public void testDynamicMaxUsageCountChangeOverflowMaxInt(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(1);
|
Pool<CloseableHolder> pool = factory.getPool(1);
|
||||||
Pool<String>.Entry entry = pool.reserve();
|
Pool<CloseableHolder>.Entry entry = pool.reserve();
|
||||||
entry.enable("aaa", false);
|
entry.enable(new CloseableHolder("aaa"), false);
|
||||||
entry.setUsageCount(Integer.MAX_VALUE);
|
entry.setUsageCount(Integer.MAX_VALUE);
|
||||||
|
|
||||||
Pool<String>.Entry acquired1 = pool.acquire();
|
Pool<CloseableHolder>.Entry acquired1 = pool.acquire();
|
||||||
assertThat(acquired1, notNullValue());
|
assertThat(acquired1, notNullValue());
|
||||||
assertThat(pool.release(acquired1), is(true));
|
assertThat(pool.release(acquired1), is(true));
|
||||||
|
|
||||||
pool.setMaxUsageCount(1);
|
pool.setMaxUsageCount(1);
|
||||||
Pool<String>.Entry acquired2 = pool.acquire();
|
Pool<CloseableHolder>.Entry acquired2 = pool.acquire();
|
||||||
assertThat(acquired2, nullValue());
|
assertThat(acquired2, nullValue());
|
||||||
|
assertThat(entry.getPooled().closed, is(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testDynamicMaxUsageCountChangeSweep(Factory factory)
|
public void testDynamicMaxUsageCountChangeSweep(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(2);
|
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||||
Pool<String>.Entry entry1 = pool.reserve();
|
Pool<CloseableHolder>.Entry entry1 = pool.reserve();
|
||||||
entry1.enable("aaa", false);
|
entry1.enable(new CloseableHolder("aaa"), false);
|
||||||
Pool<String>.Entry entry2 = pool.reserve();
|
Pool<CloseableHolder>.Entry entry2 = pool.reserve();
|
||||||
entry2.enable("bbb", false);
|
entry2.enable(new CloseableHolder("bbb"), false);
|
||||||
|
|
||||||
Pool<String>.Entry acquired1 = pool.acquire();
|
Pool<CloseableHolder>.Entry acquired1 = pool.acquire();
|
||||||
assertThat(acquired1, notNullValue());
|
assertThat(acquired1, notNullValue());
|
||||||
assertThat(pool.release(acquired1), is(true));
|
assertThat(pool.release(acquired1), is(true));
|
||||||
|
|
||||||
|
assertThat(pool.size(), is(2));
|
||||||
pool.setMaxUsageCount(1);
|
pool.setMaxUsageCount(1);
|
||||||
assertThat(pool.size(), is(1));
|
assertThat(pool.size(), is(1));
|
||||||
|
assertThat(entry1.getPooled().closed ^ entry2.getPooled().closed, is(true));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConfigLimits()
|
public void testConfigLimits()
|
||||||
{
|
{
|
||||||
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(FIRST, 1).setMaxMultiplex(0));
|
assertThrows(IllegalArgumentException.class, () -> new Pool<CloseableHolder>(FIRST, 1).setMaxMultiplex(0));
|
||||||
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(FIRST, 1).setMaxMultiplex(-1));
|
assertThrows(IllegalArgumentException.class, () -> new Pool<CloseableHolder>(FIRST, 1).setMaxMultiplex(-1));
|
||||||
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(FIRST, 1).setMaxUsageCount(0));
|
assertThrows(IllegalArgumentException.class, () -> new Pool<CloseableHolder>(FIRST, 1).setMaxUsageCount(0));
|
||||||
}
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@MethodSource(value = "strategy")
|
@MethodSource(value = "strategy")
|
||||||
public void testAcquireWithCreator(Factory factory)
|
public void testAcquireWithCreator(Factory factory)
|
||||||
{
|
{
|
||||||
Pool<String> pool = factory.getPool(2);
|
Pool<CloseableHolder> pool = factory.getPool(2);
|
||||||
|
|
||||||
assertThat(pool.size(), is(0));
|
assertThat(pool.size(), is(0));
|
||||||
assertThat(pool.acquire(e -> null), nullValue());
|
assertThat(pool.acquire(e -> null), nullValue());
|
||||||
assertThat(pool.size(), is(0));
|
assertThat(pool.size(), is(0));
|
||||||
|
|
||||||
Pool<String>.Entry e1 = pool.acquire(e -> "e1");
|
Pool<CloseableHolder>.Entry e1 = pool.acquire(e -> new CloseableHolder("e1"));
|
||||||
assertThat(e1.getPooled(), is("e1"));
|
assertThat(e1.getPooled().value, is("e1"));
|
||||||
assertThat(pool.size(), is(1));
|
assertThat(pool.size(), is(1));
|
||||||
assertThat(pool.getReservedCount(), is(0));
|
assertThat(pool.getReservedCount(), is(0));
|
||||||
assertThat(pool.getInUseCount(), is(1));
|
assertThat(pool.getInUseCount(), is(1));
|
||||||
|
@ -659,13 +669,13 @@ public class PoolTest
|
||||||
assertThat(pool.acquire(e -> null), nullValue());
|
assertThat(pool.acquire(e -> null), nullValue());
|
||||||
assertThat(pool.size(), is(1));
|
assertThat(pool.size(), is(1));
|
||||||
|
|
||||||
Pool<String>.Entry e2 = pool.acquire(e -> "e2");
|
Pool<CloseableHolder>.Entry e2 = pool.acquire(e -> new CloseableHolder("e2"));
|
||||||
assertThat(e2.getPooled(), is("e2"));
|
assertThat(e2.getPooled().value, is("e2"));
|
||||||
assertThat(pool.size(), is(2));
|
assertThat(pool.size(), is(2));
|
||||||
assertThat(pool.getReservedCount(), is(0));
|
assertThat(pool.getReservedCount(), is(0));
|
||||||
assertThat(pool.getInUseCount(), is(2));
|
assertThat(pool.getInUseCount(), is(2));
|
||||||
|
|
||||||
Pool<String>.Entry e3 = pool.acquire(e -> "e3");
|
Pool<CloseableHolder>.Entry e3 = pool.acquire(e -> new CloseableHolder("e3"));
|
||||||
assertThat(e3, nullValue());
|
assertThat(e3, nullValue());
|
||||||
assertThat(pool.size(), is(2));
|
assertThat(pool.size(), is(2));
|
||||||
assertThat(pool.getReservedCount(), is(0));
|
assertThat(pool.getReservedCount(), is(0));
|
||||||
|
@ -681,8 +691,8 @@ public class PoolTest
|
||||||
assertThat(pool.getReservedCount(), is(0));
|
assertThat(pool.getReservedCount(), is(0));
|
||||||
assertThat(pool.getInUseCount(), is(1));
|
assertThat(pool.getInUseCount(), is(1));
|
||||||
|
|
||||||
Pool<String>.Entry e4 = pool.acquire(e -> "e4");
|
Pool<CloseableHolder>.Entry e4 = pool.acquire(e -> new CloseableHolder("e4"));
|
||||||
assertThat(e4.getPooled(), is("e2"));
|
assertThat(e4.getPooled().value, is("e2"));
|
||||||
assertThat(pool.size(), is(2));
|
assertThat(pool.size(), is(2));
|
||||||
assertThat(pool.getReservedCount(), is(0));
|
assertThat(pool.getReservedCount(), is(0));
|
||||||
assertThat(pool.getInUseCount(), is(2));
|
assertThat(pool.getInUseCount(), is(2));
|
||||||
|
|
Loading…
Reference in New Issue