Alternative Pool Strategies (#5218)
* Speculative idea to make a pluggable Pool strategy Signed-off-by: Greg Wilkins <gregw@webtide.com> * Speculative idea to make a pluggable Pool strategy + javadoc Signed-off-by: Greg Wilkins <gregw@webtide.com> * Speculative idea to make a pluggable Pool strategy + Added a ThreadLocalStrategy for a single cached item + Tell strategies about newly reserved entries + Fixed multiplexing test that was dependent on the impl of the cache Signed-off-by: Greg Wilkins <gregw@webtide.com> * Speculative idea to make a pluggable Pool strategy + added tests Signed-off-by: Greg Wilkins <gregw@webtide.com> * Feedback from review + Don't have a fallback iteration, instead make a SearchStrategy and DualStrategy * Feedback from review + split strategies into Cache and Strategies * Feedback from review + Added reserve and release * Improved Pool Strategies: + reverted to post notifications for removed, reserved and released. + Added a few more strategies that need to be benchmarked, that use the list iterator. Signed-off-by: Greg Wilkins <gregw@webtide.com> * Testing all the different strategies Signed-off-by: Greg Wilkins <gregw@webtide.com> * More simplifications and made LRU work (ish) Signed-off-by: Greg Wilkins <gregw@webtide.com> * javadoc * More javadoc Signed-off-by: Greg Wilkins <gregw@webtide.com> * JMH Test Signed-off-by: Greg Wilkins <gregw@webtide.com> * one strategy Signed-off-by: gregw <gregw@webtide.com> * test Signed-off-by: gregw <gregw@webtide.com> * Split implementations: + pluggable strategies + hard coded Signed-off-by: Greg Wilkins <gregw@webtide.com> * More benchmarks * Built in strategy * removed strategies version and simplified to single configurable solution. Signed-off-by: Greg Wilkins <gregw@webtide.com> * updates from review Signed-off-by: Greg Wilkins <gregw@webtide.com> * better javadoc Signed-off-by: Greg Wilkins <gregw@webtide.com> * Updated ConnectionPool classes to use Pool strategies * Small javadocs fixes. Signed-off-by: Simone Bordet <simone.bordet@gmail.com> * Updates from review * javadoc Co-authored-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
a37ab389aa
commit
ba22c08fde
|
@ -33,6 +33,7 @@ import org.eclipse.jetty.util.Pool;
|
|||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
@ -41,7 +42,7 @@ import org.eclipse.jetty.util.thread.Sweeper;
|
|||
import static java.util.stream.Collectors.toCollection;
|
||||
|
||||
@ManagedObject
|
||||
public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable, Sweeper.Sweepable
|
||||
public abstract class AbstractConnectionPool extends ContainerLifeCycle implements ConnectionPool, Dumpable, Sweeper.Sweepable
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class);
|
||||
|
||||
|
@ -56,21 +57,26 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
|
|||
@Deprecated
|
||||
protected AbstractConnectionPool(Destination destination, int maxConnections, Callback requester)
|
||||
{
|
||||
this((HttpDestination)destination, maxConnections, true, requester);
|
||||
this((HttpDestination)destination, maxConnections, false, requester);
|
||||
}
|
||||
|
||||
protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
|
||||
{
|
||||
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
|
||||
}
|
||||
|
||||
protected AbstractConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
|
||||
{
|
||||
this.destination = destination;
|
||||
this.requester = requester;
|
||||
@SuppressWarnings("unchecked")
|
||||
Pool<Connection> pool = destination.getBean(Pool.class);
|
||||
if (pool == null)
|
||||
{
|
||||
pool = new Pool<>(maxConnections, cache ? 1 : 0);
|
||||
destination.addBean(pool);
|
||||
}
|
||||
this.pool = pool;
|
||||
addBean(pool);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void doStop() throws Exception
|
||||
{
|
||||
pool.close();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -18,7 +18,9 @@
|
|||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Pool;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
|
||||
|
@ -27,12 +29,17 @@ public class DuplexConnectionPool extends AbstractConnectionPool
|
|||
{
|
||||
public DuplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
|
||||
{
|
||||
this(destination, maxConnections, true, requester);
|
||||
this(destination, maxConnections, false, requester);
|
||||
}
|
||||
|
||||
public DuplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
|
||||
{
|
||||
super(destination, maxConnections, cache, requester);
|
||||
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
|
||||
}
|
||||
|
||||
public DuplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
|
||||
{
|
||||
super(destination, pool, requester);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -1,79 +0,0 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Pool;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
/**
|
||||
* <p>A {@link MultiplexConnectionPool} that picks connections at a particular
|
||||
* index between {@code 0} and {@link #getMaxConnectionCount()}.</p>
|
||||
* <p>The algorithm that decides the index value is decided by subclasses.</p>
|
||||
* <p>To acquire a connection, this class obtains the index value and attempts
|
||||
* to activate the pool entry at that index.
|
||||
* If this activation fails, another attempt to activate an alternative pool
|
||||
* entry is performed, to avoid stalling connection acquisition if there is
|
||||
* an available entry at a different index.</p>
|
||||
*/
|
||||
@ManagedObject
|
||||
public abstract class IndexedConnectionPool extends MultiplexConnectionPool
|
||||
{
|
||||
private static final Logger LOG = Log.getLogger(IndexedConnectionPool.class);
|
||||
|
||||
private final Pool<Connection> pool;
|
||||
|
||||
public IndexedConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
|
||||
{
|
||||
super(destination, maxConnections, false, requester, maxMultiplex);
|
||||
pool = destination.getBean(Pool.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Must return an index between 0 (inclusive) and {@code maxConnections} (exclusive)
|
||||
* used to attempt to acquire the connection at that index in the pool.</p>
|
||||
*
|
||||
* @param maxConnections the upper bound of the index (exclusive)
|
||||
* @return an index between 0 (inclusive) and {@code maxConnections} (exclusive)
|
||||
*/
|
||||
protected abstract int getIndex(int maxConnections);
|
||||
|
||||
@Override
|
||||
protected Connection activate()
|
||||
{
|
||||
int index = getIndex(getMaxConnectionCount());
|
||||
Pool<Connection>.Entry entry = pool.acquireAt(index);
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("activating at index={} entry={}", index, entry);
|
||||
if (entry == null)
|
||||
{
|
||||
entry = pool.acquire();
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("activating alternative entry={}", entry);
|
||||
}
|
||||
if (entry == null)
|
||||
return null;
|
||||
Connection connection = entry.getPooled();
|
||||
acquired(connection);
|
||||
return connection;
|
||||
}
|
||||
}
|
|
@ -22,6 +22,7 @@ import org.eclipse.jetty.client.api.Connection;
|
|||
import org.eclipse.jetty.client.api.Destination;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.LeakDetector;
|
||||
import org.eclipse.jetty.util.component.LifeCycle;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
|
||||
|
@ -41,40 +42,16 @@ public class LeakTrackingConnectionPool extends DuplexConnectionPool
|
|||
public LeakTrackingConnectionPool(Destination destination, int maxConnections, Callback requester)
|
||||
{
|
||||
super((HttpDestination)destination, maxConnections, requester);
|
||||
start();
|
||||
}
|
||||
|
||||
private void start()
|
||||
{
|
||||
try
|
||||
{
|
||||
leakDetector.start();
|
||||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
throw new RuntimeException(x);
|
||||
}
|
||||
addBean(leakDetector);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close()
|
||||
{
|
||||
stop();
|
||||
LifeCycle.stop(this);
|
||||
super.close();
|
||||
}
|
||||
|
||||
private void stop()
|
||||
{
|
||||
try
|
||||
{
|
||||
leakDetector.stop();
|
||||
}
|
||||
catch (Exception x)
|
||||
{
|
||||
throw new RuntimeException(x);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void acquired(Connection connection)
|
||||
{
|
||||
|
|
|
@ -18,7 +18,9 @@
|
|||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import org.eclipse.jetty.client.api.Connection;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Pool;
|
||||
import org.eclipse.jetty.util.annotation.ManagedAttribute;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
|
||||
|
@ -27,12 +29,17 @@ public class MultiplexConnectionPool extends AbstractConnectionPool implements C
|
|||
{
|
||||
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
|
||||
{
|
||||
this(destination, maxConnections, true, requester, maxMultiplex);
|
||||
this(destination, maxConnections, false, requester, maxMultiplex);
|
||||
}
|
||||
|
||||
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
|
||||
{
|
||||
super(destination, maxConnections, cache, requester);
|
||||
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester, maxMultiplex);
|
||||
}
|
||||
|
||||
public MultiplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester, int maxMultiplex)
|
||||
{
|
||||
super(destination, pool, requester);
|
||||
setMaxMultiplex(maxMultiplex);
|
||||
}
|
||||
|
||||
|
|
|
@ -18,26 +18,19 @@
|
|||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Pool;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
|
||||
/**
|
||||
* <p>An indexed {@link ConnectionPool} that provides connections
|
||||
* <p>A {@link ConnectionPool} that provides connections
|
||||
* randomly among the ones that are available.</p>
|
||||
*/
|
||||
@ManagedObject
|
||||
public class RandomConnectionPool extends IndexedConnectionPool
|
||||
public class RandomConnectionPool extends MultiplexConnectionPool
|
||||
{
|
||||
public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
|
||||
{
|
||||
super(destination, maxConnections, requester, maxMultiplex);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getIndex(int maxConnections)
|
||||
{
|
||||
return ThreadLocalRandom.current().nextInt(maxConnections);
|
||||
super(destination, new Pool<>(Pool.StrategyType.RANDOM, maxConnections, false), requester, maxMultiplex);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -18,9 +18,8 @@
|
|||
|
||||
package org.eclipse.jetty.client;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Pool;
|
||||
import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||
|
||||
/**
|
||||
|
@ -30,7 +29,7 @@ import org.eclipse.jetty.util.annotation.ManagedObject;
|
|||
* <li>the server takes different times to serve different requests; if a request takes a long
|
||||
* time to be processed by the server, it would be a performance penalty to stall sending requests
|
||||
* waiting for that connection to be available - better skip it and try another connection</li>
|
||||
* <li>connections may be closed by the client or by the server, so it should be a performance
|
||||
* <li>connections may be closed by the client or by the server, so it would be a performance
|
||||
* penalty to stall sending requests waiting for a new connection to be opened</li>
|
||||
* <li>thread scheduling on both client and server may temporarily penalize a connection</li>
|
||||
* </ul>
|
||||
|
@ -48,10 +47,8 @@ import org.eclipse.jetty.util.annotation.ManagedObject;
|
|||
* @see RandomConnectionPool
|
||||
*/
|
||||
@ManagedObject
|
||||
public class RoundRobinConnectionPool extends IndexedConnectionPool
|
||||
public class RoundRobinConnectionPool extends MultiplexConnectionPool
|
||||
{
|
||||
private final AtomicInteger offset = new AtomicInteger();
|
||||
|
||||
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
|
||||
{
|
||||
this(destination, maxConnections, requester, 1);
|
||||
|
@ -59,17 +56,11 @@ public class RoundRobinConnectionPool extends IndexedConnectionPool
|
|||
|
||||
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
|
||||
{
|
||||
super(destination, maxConnections, requester, maxMultiplex);
|
||||
super(destination, new Pool<>(Pool.StrategyType.ROUND_ROBIN, maxConnections, false), requester, maxMultiplex);
|
||||
// If there are queued requests and connections get
|
||||
// closed due to idle timeout or overuse, we want to
|
||||
// aggressively try to open new connections to replace
|
||||
// those that were closed to process queued requests.
|
||||
setMaximizeConnections(true);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected int getIndex(int maxConnections)
|
||||
{
|
||||
return offset.getAndUpdate(v -> ++v == maxConnections ? 0 : v);
|
||||
}
|
||||
}
|
||||
|
|
|
@ -0,0 +1,139 @@
|
|||
//
|
||||
// ========================================================================
|
||||
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
|
||||
// ------------------------------------------------------------------------
|
||||
// All rights reserved. This program and the accompanying materials
|
||||
// are made available under the terms of the Eclipse Public License v1.0
|
||||
// and Apache License v2.0 which accompanies this distribution.
|
||||
//
|
||||
// The Eclipse Public License is available at
|
||||
// http://www.eclipse.org/legal/epl-v10.html
|
||||
//
|
||||
// The Apache License v2.0 is available at
|
||||
// http://www.opensource.org/licenses/apache2.0.php
|
||||
//
|
||||
// You may elect to redistribute this code under either of these licenses.
|
||||
// ========================================================================
|
||||
//
|
||||
|
||||
package org.eclipse.jetty.util;
|
||||
|
||||
import java.util.concurrent.atomic.LongAdder;
|
||||
|
||||
import org.openjdk.jmh.annotations.Benchmark;
|
||||
import org.openjdk.jmh.annotations.Param;
|
||||
import org.openjdk.jmh.annotations.Scope;
|
||||
import org.openjdk.jmh.annotations.Setup;
|
||||
import org.openjdk.jmh.annotations.State;
|
||||
import org.openjdk.jmh.annotations.TearDown;
|
||||
import org.openjdk.jmh.infra.Blackhole;
|
||||
import org.openjdk.jmh.results.format.ResultFormatType;
|
||||
import org.openjdk.jmh.runner.Runner;
|
||||
import org.openjdk.jmh.runner.RunnerException;
|
||||
import org.openjdk.jmh.runner.options.Options;
|
||||
import org.openjdk.jmh.runner.options.OptionsBuilder;
|
||||
|
||||
@State(Scope.Benchmark)
|
||||
public class PoolStrategyBenchmark
|
||||
{
|
||||
private Pool<String> pool;
|
||||
|
||||
@Param({
|
||||
"Pool.Linear",
|
||||
"Pool.Random",
|
||||
"Pool.RoundRobin",
|
||||
"Pool.ThreadId",
|
||||
})
|
||||
public static String POOL_TYPE;
|
||||
|
||||
@Param({
|
||||
"false",
|
||||
"true",
|
||||
})
|
||||
public static boolean CACHE;
|
||||
|
||||
@Param({
|
||||
"4",
|
||||
"16"
|
||||
})
|
||||
public static int SIZE;
|
||||
|
||||
private static final LongAdder misses = new LongAdder();
|
||||
private static final LongAdder hits = new LongAdder();
|
||||
private static final LongAdder total = new LongAdder();
|
||||
|
||||
@Setup
|
||||
public void setUp() throws Exception
|
||||
{
|
||||
misses.reset();
|
||||
|
||||
switch (POOL_TYPE)
|
||||
{
|
||||
case "Pool.Linear" :
|
||||
pool = new Pool<>(Pool.StrategyType.FIRST, SIZE, CACHE);
|
||||
break;
|
||||
case "Pool.Random" :
|
||||
pool = new Pool<>(Pool.StrategyType.RANDOM, SIZE, CACHE);
|
||||
break;
|
||||
case "Pool.ThreadId" :
|
||||
pool = new Pool<>(Pool.StrategyType.THREAD_ID, SIZE, CACHE);
|
||||
break;
|
||||
case "Pool.RoundRobin" :
|
||||
pool = new Pool<>(Pool.StrategyType.ROUND_ROBIN, SIZE, CACHE);
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
for (int i = 0; i < SIZE; i++)
|
||||
{
|
||||
pool.reserve(1).enable(Integer.toString(i), false);
|
||||
}
|
||||
}
|
||||
|
||||
@TearDown
|
||||
public void tearDown()
|
||||
{
|
||||
System.err.printf("%nMISSES = %d (%d%%)%n", misses.longValue(), 100 * misses.longValue() / (hits.longValue() + misses.longValue()));
|
||||
System.err.printf("AVERAGE = %d%n", total.longValue() / hits.longValue());
|
||||
pool.close();
|
||||
pool = null;
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
public void testAcquireReleasePoolWithStrategy()
|
||||
{
|
||||
// Now really benchmark the strategy we are interested in
|
||||
Pool<String>.Entry entry = pool.acquire();
|
||||
if (entry == null || entry.isIdle())
|
||||
{
|
||||
misses.increment();
|
||||
Blackhole.consumeCPU(20);
|
||||
return;
|
||||
}
|
||||
// do some work
|
||||
hits.increment();
|
||||
total.add(Long.parseLong(entry.getPooled()));
|
||||
Blackhole.consumeCPU(entry.getPooled().hashCode() % 20);
|
||||
|
||||
// release the entry
|
||||
entry.release();
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws RunnerException
|
||||
{
|
||||
Options opt = new OptionsBuilder()
|
||||
.include(PoolStrategyBenchmark.class.getSimpleName())
|
||||
.warmupIterations(3)
|
||||
.measurementIterations(3)
|
||||
.forks(1)
|
||||
.threads(8)
|
||||
.resultFormat(ResultFormatType.JSON)
|
||||
.result("/tmp/poolStrategy-" + System.currentTimeMillis() + ".json")
|
||||
// .addProfiler(GCProfiler.class)
|
||||
.build();
|
||||
|
||||
new Runner(opt).run();
|
||||
}
|
||||
}
|
|
@ -26,26 +26,20 @@ import java.util.Collections;
|
|||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.CopyOnWriteArrayList;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.function.Function;
|
||||
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
import org.eclipse.jetty.util.component.DumpableCollection;
|
||||
import org.eclipse.jetty.util.log.Log;
|
||||
import org.eclipse.jetty.util.log.Logger;
|
||||
import org.eclipse.jetty.util.thread.Locker;
|
||||
|
||||
/**
|
||||
* A fast pool of objects, with optional support for
|
||||
* multiplexing, max usage count and thread-local caching.
|
||||
* <p>
|
||||
* The thread-local caching mechanism is about remembering up to N previously
|
||||
* used entries into a thread-local single-threaded collection.
|
||||
* When that collection is not empty, its entries are removed one by one
|
||||
* during acquisition until an entry that can be acquired is found.
|
||||
* This can greatly speed up acquisition when both the acquisition and the
|
||||
* release of the entries is done on the same thread as this avoids iterating
|
||||
* the global, thread-safe collection of entries.
|
||||
* </p>
|
||||
* multiplexing, max usage count and several optimized strategies plus
|
||||
* an optional {@link ThreadLocal} cache of the last release entry.
|
||||
* <p>
|
||||
* When the method {@link #close()} is called, all {@link Closeable}s in the pool
|
||||
* are also closed.
|
||||
|
@ -56,38 +50,88 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
{
|
||||
private static final Logger LOGGER = Log.getLogger(Pool.class);
|
||||
|
||||
private final List<Entry> sharedList = new CopyOnWriteArrayList<>();
|
||||
private final List<Entry> entries = new CopyOnWriteArrayList<>();
|
||||
|
||||
private final int maxEntries;
|
||||
private final AtomicInteger pending = new AtomicInteger();
|
||||
private final StrategyType strategyType;
|
||||
|
||||
/*
|
||||
* The cache is used to avoid hammering on the first index of the entry list.
|
||||
* Caches can become poisoned (i.e.: containing entries that are in use) when
|
||||
* the release isn't done by the acquiring thread or when the entry pool is
|
||||
* undersized compared to the load applied on it.
|
||||
* When an entry can't be found in the cache, the global list is iterated
|
||||
* normally so the cache has no visible effect besides performance.
|
||||
* with the configured strategy so the cache has no visible effect besides performance.
|
||||
*/
|
||||
private final ThreadLocal<List<Entry>> cache;
|
||||
private final Locker locker = new Locker();
|
||||
private final int maxEntries;
|
||||
private final int cacheSize;
|
||||
private final AtomicInteger pending = new AtomicInteger();
|
||||
private final ThreadLocal<Entry> cache;
|
||||
private final AtomicInteger nextIndex;
|
||||
private volatile boolean closed;
|
||||
private volatile int maxMultiplex = 1;
|
||||
private volatile int maxUsageCount = -1;
|
||||
|
||||
/**
|
||||
* Construct a Pool with the specified thread-local cache size.
|
||||
*
|
||||
* @param maxEntries the maximum amount of entries that the pool will accept.
|
||||
* @param cacheSize the thread-local cache size. A value less than 1 means the cache is disabled.
|
||||
* The type of the strategy to use for the pool.
|
||||
* The strategy primarily determines where iteration over the pool entries begins.
|
||||
*/
|
||||
public Pool(int maxEntries, int cacheSize)
|
||||
public enum StrategyType
|
||||
{
|
||||
/**
|
||||
* A strategy that looks for an entry always starting from the first entry.
|
||||
* It will favour the early entries in the pool, but may contend on them more.
|
||||
*/
|
||||
FIRST,
|
||||
|
||||
/**
|
||||
* A strategy that looks for an entry by iterating from a random starting
|
||||
* index. No entries are favoured and contention is reduced.
|
||||
*/
|
||||
RANDOM,
|
||||
|
||||
/**
|
||||
* A strategy that uses the {@link Thread#getId()} of the current thread
|
||||
* to select a starting point for an entry search. Whilst not as performant as
|
||||
* using the {@link ThreadLocal} cache, it may be suitable when the pool is substantially smaller
|
||||
* than the number of available threads.
|
||||
* No entries are favoured and contention is reduced.
|
||||
*/
|
||||
THREAD_ID,
|
||||
|
||||
/**
|
||||
* A strategy that looks for an entry by iterating from a starting point
|
||||
* that is incremented on every search. This gives similar results to the
|
||||
* random strategy but with more predictable behaviour.
|
||||
* No entries are favoured and contention is reduced.
|
||||
*/
|
||||
ROUND_ROBIN,
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a Pool with a specified lookup strategy and no
|
||||
* {@link ThreadLocal} cache.
|
||||
*
|
||||
* @param strategyType The strategy to used for looking up entries.
|
||||
* @param maxEntries the maximum amount of entries that the pool will accept.
|
||||
*/
|
||||
public Pool(StrategyType strategyType, int maxEntries)
|
||||
{
|
||||
this(strategyType, maxEntries, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Construct a Pool with the specified thread-local cache size and
|
||||
* an optional {@link ThreadLocal} cache.
|
||||
* @param strategyType The strategy to used for looking up entries.
|
||||
* @param maxEntries the maximum amount of entries that the pool will accept.
|
||||
* @param cache True if a {@link ThreadLocal} cache should be used to try the most recently released entry.
|
||||
*/
|
||||
public Pool(StrategyType strategyType, int maxEntries, boolean cache)
|
||||
{
|
||||
this.maxEntries = maxEntries;
|
||||
this.cacheSize = cacheSize;
|
||||
if (cacheSize > 0)
|
||||
this.cache = ThreadLocal.withInitial(() -> new ArrayList<Entry>(cacheSize));
|
||||
else
|
||||
this.cache = null;
|
||||
this.strategyType = strategyType;
|
||||
this.cache = cache ? new ThreadLocal<>() : null;
|
||||
nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
|
||||
}
|
||||
|
||||
public int getReservedCount()
|
||||
|
@ -97,12 +141,12 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
|
||||
public int getIdleCount()
|
||||
{
|
||||
return (int)sharedList.stream().filter(Entry::isIdle).count();
|
||||
return (int)entries.stream().filter(Entry::isIdle).count();
|
||||
}
|
||||
|
||||
public int getInUseCount()
|
||||
{
|
||||
return (int)sharedList.stream().filter(Entry::isInUse).count();
|
||||
return (int)entries.stream().filter(Entry::isInUse).count();
|
||||
}
|
||||
|
||||
public int getMaxEntries()
|
||||
|
@ -153,7 +197,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
if (closed)
|
||||
return null;
|
||||
|
||||
int space = maxEntries - sharedList.size();
|
||||
int space = maxEntries - entries.size();
|
||||
if (space <= 0)
|
||||
return null;
|
||||
|
||||
|
@ -165,17 +209,18 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
pending.incrementAndGet();
|
||||
|
||||
Entry entry = new Entry();
|
||||
sharedList.add(entry);
|
||||
entries.add(entry);
|
||||
return entry;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquire the entry from the pool at the specified index. This method bypasses the thread-local mechanism.
|
||||
*
|
||||
* @deprecated No longer supported. Instead use a {@link StrategyType} to configure the pool.
|
||||
* @param idx the index of the entry to acquire.
|
||||
* @return the specified entry or null if there is none at the specified index or if it is not available.
|
||||
*/
|
||||
@Deprecated
|
||||
public Entry acquireAt(int idx)
|
||||
{
|
||||
if (closed)
|
||||
|
@ -183,7 +228,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
|
||||
try
|
||||
{
|
||||
Entry entry = sharedList.get(idx);
|
||||
Entry entry = entries.get(idx);
|
||||
if (entry.tryAcquire())
|
||||
return entry;
|
||||
}
|
||||
|
@ -204,27 +249,54 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
if (closed)
|
||||
return null;
|
||||
|
||||
// first check the thread-local cache
|
||||
int size = entries.size();
|
||||
if (size == 0)
|
||||
return null;
|
||||
|
||||
if (cache != null)
|
||||
{
|
||||
List<Entry> cachedList = cache.get();
|
||||
while (!cachedList.isEmpty())
|
||||
{
|
||||
Entry cachedEntry = cachedList.remove(cachedList.size() - 1);
|
||||
if (cachedEntry.tryAcquire())
|
||||
return cachedEntry;
|
||||
}
|
||||
}
|
||||
|
||||
// then iterate the shared list
|
||||
for (Entry entry : sharedList)
|
||||
{
|
||||
if (entry.tryAcquire())
|
||||
Pool<T>.Entry entry = cache.get();
|
||||
if (entry != null && entry.tryAcquire())
|
||||
return entry;
|
||||
}
|
||||
|
||||
int index = startIndex(size);
|
||||
|
||||
for (int tries = size; tries-- > 0;)
|
||||
{
|
||||
try
|
||||
{
|
||||
Pool<T>.Entry entry = entries.get(index);
|
||||
if (entry != null && entry.tryAcquire())
|
||||
return entry;
|
||||
}
|
||||
catch (IndexOutOfBoundsException e)
|
||||
{
|
||||
LOGGER.ignore(e);
|
||||
size = entries.size();
|
||||
}
|
||||
index = (index + 1) % size;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private int startIndex(int size)
|
||||
{
|
||||
switch (strategyType)
|
||||
{
|
||||
case FIRST:
|
||||
return 0;
|
||||
case RANDOM:
|
||||
return ThreadLocalRandom.current().nextInt(size);
|
||||
case ROUND_ROBIN:
|
||||
return nextIndex.getAndUpdate(c -> Math.max(0, c + 1)) % size;
|
||||
case THREAD_ID:
|
||||
return (int)(Thread.currentThread().getId() % size);
|
||||
default:
|
||||
throw new IllegalArgumentException("Unknown strategy type: " + strategyType);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility method to acquire an entry from the pool,
|
||||
* reserving and creating a new entry if necessary.
|
||||
|
@ -278,17 +350,10 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
if (closed)
|
||||
return false;
|
||||
|
||||
// first mark it as unused
|
||||
boolean reusable = entry.tryRelease();
|
||||
|
||||
// then cache the released entry
|
||||
if (cache != null && reusable)
|
||||
{
|
||||
List<Entry> cachedList = cache.get();
|
||||
if (cachedList.size() < cacheSize)
|
||||
cachedList.add(entry);
|
||||
}
|
||||
return reusable;
|
||||
boolean released = entry.tryRelease();
|
||||
if (released && cache != null)
|
||||
cache.set(entry);
|
||||
return released;
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -309,12 +374,9 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
return false;
|
||||
}
|
||||
|
||||
boolean removed = sharedList.remove(entry);
|
||||
if (!removed)
|
||||
{
|
||||
if (LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("Attempt to remove an object from the pool that does not exist: {}", entry);
|
||||
}
|
||||
boolean removed = entries.remove(entry);
|
||||
if (!removed && LOGGER.isDebugEnabled())
|
||||
LOGGER.debug("Attempt to remove an object from the pool that does not exist: {}", entry);
|
||||
|
||||
return removed;
|
||||
}
|
||||
|
@ -331,8 +393,8 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
try (Locker.Lock l = locker.lock())
|
||||
{
|
||||
closed = true;
|
||||
copy = new ArrayList<>(sharedList);
|
||||
sharedList.clear();
|
||||
copy = new ArrayList<>(entries);
|
||||
entries.clear();
|
||||
}
|
||||
|
||||
// iterate the copy and close its entries
|
||||
|
@ -345,29 +407,30 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
|
||||
public int size()
|
||||
{
|
||||
return sharedList.size();
|
||||
return entries.size();
|
||||
}
|
||||
|
||||
public Collection<Entry> values()
|
||||
{
|
||||
return Collections.unmodifiableCollection(sharedList);
|
||||
return Collections.unmodifiableCollection(entries);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void dump(Appendable out, String indent) throws IOException
|
||||
{
|
||||
Dumpable.dumpObjects(out, indent, this);
|
||||
Dumpable.dumpObjects(out, indent, this,
|
||||
new DumpableCollection("entries", entries));
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s@%x[size=%d closed=%s entries=%s]",
|
||||
return String.format("%s@%x[size=%d closed=%s pending=%d]",
|
||||
getClass().getSimpleName(),
|
||||
hashCode(),
|
||||
sharedList.size(),
|
||||
entries.size(),
|
||||
closed,
|
||||
sharedList);
|
||||
pending.get());
|
||||
}
|
||||
|
||||
public class Entry
|
||||
|
@ -500,6 +563,13 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
return !(overUsed && newMultiplexingCount == 0);
|
||||
}
|
||||
|
||||
public boolean isOverUsed()
|
||||
{
|
||||
int currentMaxUsageCount = maxUsageCount;
|
||||
int usageCount = state.getHi();
|
||||
return currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount;
|
||||
}
|
||||
|
||||
/**
|
||||
* Try to mark the entry as removed.
|
||||
* @return true if the entry has to be removed from the containing pool, false otherwise.
|
||||
|
@ -549,12 +619,17 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
|||
public String toString()
|
||||
{
|
||||
long encoded = state.get();
|
||||
return String.format("%s@%x{usage=%d/%d,multiplex=%d/%d,pooled=%s}",
|
||||
int usageCount = AtomicBiInteger.getHi(encoded);
|
||||
int multiplexCount = AtomicBiInteger.getLo(encoded);
|
||||
|
||||
String state = usageCount < 0 ? "CLOSED" : multiplexCount == 0 ? "IDLE" : "INUSE";
|
||||
|
||||
return String.format("%s@%x{%s, usage=%d, multiplex=%d/%d, pooled=%s}",
|
||||
getClass().getSimpleName(),
|
||||
hashCode(),
|
||||
AtomicBiInteger.getHi(encoded),
|
||||
getMaxUsageCount(),
|
||||
AtomicBiInteger.getLo(encoded),
|
||||
state,
|
||||
Math.max(usageCount, 0),
|
||||
Math.max(multiplexCount, 0),
|
||||
getMaxMultiplex(),
|
||||
pooled);
|
||||
}
|
||||
|
|
|
@ -21,17 +21,27 @@ package org.eclipse.jetty.util;
|
|||
import java.io.Closeable;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.params.ParameterizedTest;
|
||||
import org.junit.jupiter.params.provider.MethodSource;
|
||||
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.eclipse.jetty.util.Pool.StrategyType.FIRST;
|
||||
import static org.eclipse.jetty.util.Pool.StrategyType.RANDOM;
|
||||
import static org.eclipse.jetty.util.Pool.StrategyType.ROUND_ROBIN;
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.hamcrest.Matchers.equalTo;
|
||||
import static org.hamcrest.Matchers.greaterThan;
|
||||
import static org.hamcrest.Matchers.is;
|
||||
import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
||||
import static org.hamcrest.Matchers.not;
|
||||
import static org.hamcrest.Matchers.notNullValue;
|
||||
import static org.hamcrest.Matchers.nullValue;
|
||||
import static org.hamcrest.Matchers.sameInstance;
|
||||
|
@ -40,20 +50,27 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
|
|||
|
||||
public class PoolTest
|
||||
{
|
||||
public static Stream<Object[]> cacheSize()
|
||||
|
||||
interface Factory
|
||||
{
|
||||
Pool<String> getPool(int maxSize);
|
||||
}
|
||||
|
||||
public static Stream<Object[]> strategy()
|
||||
{
|
||||
List<Object[]> data = new ArrayList<>();
|
||||
data.add(new Object[]{0});
|
||||
data.add(new Object[]{1});
|
||||
data.add(new Object[]{2});
|
||||
data.add(new Object[]{(Factory)s -> new Pool<>(FIRST, s)});
|
||||
data.add(new Object[]{(Factory)s -> new Pool<>(RANDOM, s)});
|
||||
data.add(new Object[]{(Factory)s -> new Pool<>(FIRST, s, true)});
|
||||
data.add(new Object[]{(Factory)s -> new Pool<>(ROUND_ROBIN, s)});
|
||||
return data.stream();
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testAcquireRelease(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testAcquireRelease(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1,cacheSize);
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
pool.reserve(-1).enable("aaa", false);
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.getReservedCount(), is(0));
|
||||
|
@ -94,10 +111,10 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testRemoveBeforeRelease(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testRemoveBeforeRelease(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, cacheSize);
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
pool.reserve(-1).enable("aaa", false);
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
|
@ -107,10 +124,10 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testCloseBeforeRelease(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testCloseBeforeRelease(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, cacheSize);
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
pool.reserve(-1).enable("aaa", false);
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
|
@ -121,10 +138,10 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testMaxPoolSize(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testMaxPoolSize(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, cacheSize);
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
assertThat(pool.size(), is(0));
|
||||
assertThat(pool.reserve(-1), notNullValue());
|
||||
assertThat(pool.size(), is(1));
|
||||
|
@ -133,10 +150,10 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testReserve(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testReserve(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(2, cacheSize);
|
||||
Pool<String> pool = factory.getPool(2);
|
||||
|
||||
// Reserve an entry
|
||||
Pool<String>.Entry e1 = pool.reserve(-1);
|
||||
|
@ -196,10 +213,10 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testReserveMaxPending(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testReserveMaxPending(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(2, cacheSize);
|
||||
Pool<String> pool = factory.getPool(2);
|
||||
assertThat(pool.reserve(0), nullValue());
|
||||
assertThat(pool.reserve(1), notNullValue());
|
||||
assertThat(pool.reserve(1), nullValue());
|
||||
|
@ -210,20 +227,20 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testReserveNegativeMaxPending(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testReserveNegativeMaxPending(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(2, cacheSize);
|
||||
Pool<String> pool = factory.getPool(2);
|
||||
assertThat(pool.reserve(-1), notNullValue());
|
||||
assertThat(pool.reserve(-1), notNullValue());
|
||||
assertThat(pool.reserve(-1), nullValue());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testClose(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testClose(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, cacheSize);
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
pool.reserve(-1).enable("aaa", false);
|
||||
assertThat(pool.isClosed(), is(false));
|
||||
pool.close();
|
||||
|
@ -235,12 +252,11 @@ public class PoolTest
|
|||
assertThat(pool.reserve(-1), nullValue());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testClosingCloseable(int cacheSize)
|
||||
@Test
|
||||
public void testClosingCloseable()
|
||||
{
|
||||
AtomicBoolean closed = new AtomicBoolean();
|
||||
Pool<Closeable> pool = new Pool<>(1,0);
|
||||
Pool<Closeable> pool = new Pool<>(FIRST, 1);
|
||||
Closeable pooled = () -> closed.set(true);
|
||||
pool.reserve(-1).enable(pooled, false);
|
||||
assertThat(closed.get(), is(false));
|
||||
|
@ -249,10 +265,10 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testRemove(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testRemove(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, cacheSize);
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
pool.reserve(-1).enable("aaa", false);
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
|
@ -264,10 +280,10 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testValuesSize(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testValuesSize(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(2, cacheSize);
|
||||
Pool<String> pool = factory.getPool(2);
|
||||
|
||||
assertThat(pool.size(), is(0));
|
||||
assertThat(pool.values().isEmpty(), is(true));
|
||||
|
@ -278,10 +294,10 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testValuesContainsAcquiredEntries(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testValuesContainsAcquiredEntries(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(2, cacheSize);
|
||||
Pool<String> pool = factory.getPool(2);
|
||||
|
||||
pool.reserve(-1).enable("aaa", false);
|
||||
pool.reserve(-1).enable("bbb", false);
|
||||
|
@ -292,10 +308,10 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testAcquireAt(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testAcquireAt(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(2, cacheSize);
|
||||
Pool<String> pool = factory.getPool(2);
|
||||
|
||||
pool.reserve(-1).enable("aaa", false);
|
||||
pool.reserve(-1).enable("bbb", false);
|
||||
|
@ -308,10 +324,10 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testMaxUsageCount(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testMaxUsageCount(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, cacheSize);
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
pool.setMaxUsageCount(3);
|
||||
pool.reserve(-1).enable("aaa", false);
|
||||
|
||||
|
@ -331,39 +347,54 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testMaxMultiplex(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testMaxMultiplex(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(2, cacheSize);
|
||||
Pool<String> pool = factory.getPool(2);
|
||||
pool.setMaxMultiplex(3);
|
||||
pool.reserve(-1).enable("aaa", false);
|
||||
pool.reserve(-1).enable("bbb", false);
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
Pool<String>.Entry e2 = pool.acquire();
|
||||
Pool<String>.Entry e3 = pool.acquire();
|
||||
Pool<String>.Entry e4 = pool.acquire();
|
||||
assertThat(e1.getPooled(), equalTo("aaa"));
|
||||
assertThat(e1, sameInstance(e2));
|
||||
assertThat(e1, sameInstance(e3));
|
||||
assertThat(e4.getPooled(), equalTo("bbb"));
|
||||
assertThat(pool.release(e1), is(true));
|
||||
Pool<String>.Entry e5 = pool.acquire();
|
||||
assertThat(e2, sameInstance(e5));
|
||||
Pool<String>.Entry e6 = pool.acquire();
|
||||
assertThat(e4, sameInstance(e6));
|
||||
Map<String, AtomicInteger> counts = new HashMap<>();
|
||||
AtomicInteger a = new AtomicInteger();
|
||||
AtomicInteger b = new AtomicInteger();
|
||||
counts.put("a", a);
|
||||
counts.put("b", b);
|
||||
pool.reserve(-1).enable("a", false);
|
||||
pool.reserve(-1).enable("b", false);
|
||||
|
||||
counts.get(pool.acquire().getPooled()).incrementAndGet();
|
||||
counts.get(pool.acquire().getPooled()).incrementAndGet();
|
||||
counts.get(pool.acquire().getPooled()).incrementAndGet();
|
||||
counts.get(pool.acquire().getPooled()).incrementAndGet();
|
||||
|
||||
assertThat(a.get(), greaterThan(0));
|
||||
assertThat(a.get(), lessThanOrEqualTo(3));
|
||||
assertThat(b.get(), greaterThan(0));
|
||||
assertThat(b.get(), lessThanOrEqualTo(3));
|
||||
|
||||
counts.get(pool.acquire().getPooled()).incrementAndGet();
|
||||
counts.get(pool.acquire().getPooled()).incrementAndGet();
|
||||
|
||||
assertThat(a.get(), is(3));
|
||||
assertThat(b.get(), is(3));
|
||||
|
||||
assertNull(pool.acquire());
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testRemoveMultiplexed(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testRemoveMultiplexed(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, cacheSize);
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.reserve(-1).enable("aaa", false);
|
||||
|
||||
Pool<String>.Entry e1 = pool.acquire();
|
||||
assertThat(e1, notNullValue());
|
||||
Pool<String>.Entry e2 = pool.acquire();
|
||||
assertThat(e2, notNullValue());
|
||||
assertThat(e2, sameInstance(e1));
|
||||
assertThat(e2.getUsageCount(), is(2));
|
||||
|
||||
assertThat(pool.values().stream().findFirst().get().isIdle(), is(false));
|
||||
|
||||
assertThat(pool.remove(e1), is(false));
|
||||
|
@ -380,10 +411,10 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testMultiplexRemoveThenAcquireThenReleaseRemove(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testMultiplexRemoveThenAcquireThenReleaseRemove(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, cacheSize);
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.reserve(-1).enable("aaa", false);
|
||||
|
||||
|
@ -398,10 +429,10 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testNonMultiplexRemoveAfterAcquire(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testNonMultiplexRemoveAfterAcquire(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, cacheSize);
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.reserve(-1).enable("aaa", false);
|
||||
|
||||
|
@ -411,10 +442,10 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testMultiplexRemoveAfterAcquire(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testMultiplexRemoveAfterAcquire(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, cacheSize);
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.reserve(-1).enable("aaa", false);
|
||||
|
||||
|
@ -436,10 +467,10 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testReleaseThenRemoveNonEnabledEntry(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testReleaseThenRemoveNonEnabledEntry(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, cacheSize);
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
Pool<String>.Entry e = pool.reserve(-1);
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.release(e), is(false));
|
||||
|
@ -449,10 +480,10 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testRemoveNonEnabledEntry(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testRemoveNonEnabledEntry(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, cacheSize);
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
Pool<String>.Entry e = pool.reserve(-1);
|
||||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.remove(e), is(true));
|
||||
|
@ -460,10 +491,10 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testMultiplexMaxUsageReachedAcquireThenRemove(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testMultiplexMaxUsageReachedAcquireThenRemove(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, cacheSize);
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.setMaxUsageCount(3);
|
||||
pool.reserve(-1).enable("aaa", false);
|
||||
|
@ -479,12 +510,12 @@ public class PoolTest
|
|||
assertThat(pool.remove(e0), is(true));
|
||||
assertThat(pool.size(), is(0));
|
||||
}
|
||||
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, cacheSize);
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.setMaxUsageCount(3);
|
||||
pool.reserve(-1).enable("aaa", false);
|
||||
|
@ -506,10 +537,10 @@ public class PoolTest
|
|||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testUsageCountAfterReachingMaxMultiplexLimit(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testUsageCountAfterReachingMaxMultiplexLimit(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(1, cacheSize);
|
||||
Pool<String> pool = factory.getPool(1);
|
||||
pool.setMaxMultiplex(2);
|
||||
pool.setMaxUsageCount(10);
|
||||
pool.reserve(-1).enable("aaa", false);
|
||||
|
@ -517,25 +548,25 @@ public class PoolTest
|
|||
Pool<String>.Entry e1 = pool.acquire();
|
||||
assertThat(e1.getUsageCount(), is(1));
|
||||
Pool<String>.Entry e2 = pool.acquire();
|
||||
assertThat(e2, sameInstance(e1));
|
||||
assertThat(e1.getUsageCount(), is(2));
|
||||
assertThat(pool.acquire(), nullValue());
|
||||
assertThat(e1.getUsageCount(), is(2));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testConfigLimits(int cacheSize)
|
||||
@Test
|
||||
public void testConfigLimits()
|
||||
{
|
||||
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(1, 0).setMaxMultiplex(0));
|
||||
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(1, 0).setMaxMultiplex(-1));
|
||||
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(1, 0).setMaxUsageCount(0));
|
||||
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(FIRST, 1).setMaxMultiplex(0));
|
||||
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(FIRST, 1).setMaxMultiplex(-1));
|
||||
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(FIRST, 1).setMaxUsageCount(0));
|
||||
}
|
||||
|
||||
@ParameterizedTest
|
||||
@MethodSource(value = "cacheSize")
|
||||
public void testAcquireWithCreator(int cacheSize)
|
||||
@MethodSource(value = "strategy")
|
||||
public void testAcquireWithCreator(Factory factory)
|
||||
{
|
||||
Pool<String> pool = new Pool<>(2, cacheSize);
|
||||
Pool<String> pool = factory.getPool(2);
|
||||
|
||||
assertThat(pool.size(), is(0));
|
||||
assertThat(pool.acquire(e -> null), nullValue());
|
||||
|
@ -590,6 +621,67 @@ public class PoolTest
|
|||
assertThat(pool.size(), is(1));
|
||||
assertThat(pool.getReservedCount(), is(0));
|
||||
assertThat(pool.getInUseCount(), is(1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRoundRobinStrategy()
|
||||
{
|
||||
Pool<AtomicInteger> pool = new Pool<>(ROUND_ROBIN, 4);
|
||||
|
||||
Pool<AtomicInteger>.Entry e1 = pool.acquire(e -> new AtomicInteger());
|
||||
Pool<AtomicInteger>.Entry e2 = pool.acquire(e -> new AtomicInteger());
|
||||
Pool<AtomicInteger>.Entry e3 = pool.acquire(e -> new AtomicInteger());
|
||||
Pool<AtomicInteger>.Entry e4 = pool.acquire(e -> new AtomicInteger());
|
||||
assertNull(pool.acquire(e -> new AtomicInteger()));
|
||||
|
||||
pool.release(e1);
|
||||
pool.release(e2);
|
||||
pool.release(e3);
|
||||
pool.release(e4);
|
||||
|
||||
Pool<AtomicInteger>.Entry last = null;
|
||||
for (int i = 0; i < 8; i++)
|
||||
{
|
||||
Pool<AtomicInteger>.Entry e = pool.acquire();
|
||||
if (last != null)
|
||||
assertThat(e, not(sameInstance(last)));
|
||||
e.getPooled().incrementAndGet();
|
||||
pool.release(e);
|
||||
last = e;
|
||||
}
|
||||
|
||||
assertThat(e1.getPooled().get(), is(2));
|
||||
assertThat(e2.getPooled().get(), is(2));
|
||||
assertThat(e3.getPooled().get(), is(2));
|
||||
assertThat(e4.getPooled().get(), is(2));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRandomStrategy()
|
||||
{
|
||||
Pool<AtomicInteger> pool = new Pool<>(RANDOM, 4);
|
||||
|
||||
Pool<AtomicInteger>.Entry e1 = pool.acquire(e -> new AtomicInteger());
|
||||
Pool<AtomicInteger>.Entry e2 = pool.acquire(e -> new AtomicInteger());
|
||||
Pool<AtomicInteger>.Entry e3 = pool.acquire(e -> new AtomicInteger());
|
||||
Pool<AtomicInteger>.Entry e4 = pool.acquire(e -> new AtomicInteger());
|
||||
assertNull(pool.acquire(e -> new AtomicInteger()));
|
||||
|
||||
pool.release(e1);
|
||||
pool.release(e2);
|
||||
pool.release(e3);
|
||||
pool.release(e4);
|
||||
|
||||
for (int i = 0; i < 400; i++)
|
||||
{
|
||||
Pool<AtomicInteger>.Entry e = pool.acquire();
|
||||
e.getPooled().incrementAndGet();
|
||||
pool.release(e);
|
||||
}
|
||||
|
||||
assertThat(e1.getPooled().get(), greaterThan(10));
|
||||
assertThat(e2.getPooled().get(), greaterThan(10));
|
||||
assertThat(e3.getPooled().get(), greaterThan(10));
|
||||
assertThat(e4.getPooled().get(), greaterThan(10));
|
||||
}
|
||||
}
|
||||
|
|
|
@ -101,7 +101,7 @@ public class XmlConfiguration
|
|||
};
|
||||
private static final Iterable<ConfigurationProcessorFactory> __factoryLoader = ServiceLoader.load(ConfigurationProcessorFactory.class);
|
||||
private static final Pool<ConfigurationParser> __parsers =
|
||||
new Pool<>(Math.min(8, Runtime.getRuntime().availableProcessors()),1);
|
||||
new Pool<>(Pool.StrategyType.THREAD_ID, Math.min(8, Runtime.getRuntime().availableProcessors()));
|
||||
public static final Comparator<Executable> EXECUTABLE_COMPARATOR = (e1, e2) ->
|
||||
{
|
||||
// Favour methods with less parameters
|
||||
|
|
Loading…
Reference in New Issue