Merge remote-tracking branch 'origin/jetty-9.4.x' into jetty-10.0.x

Signed-off-by: gregw <gregw@webtide.com>
This commit is contained in:
gregw 2020-09-16 18:31:51 +02:00
commit b7a4c36286
12 changed files with 531 additions and 320 deletions

View File

@ -32,6 +32,7 @@ import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.thread.Sweeper;
import org.slf4j.Logger;
@ -40,7 +41,7 @@ import org.slf4j.LoggerFactory;
import static java.util.stream.Collectors.toCollection;
@ManagedObject
public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable, Sweeper.Sweepable
public abstract class AbstractConnectionPool extends ContainerLifeCycle implements ConnectionPool, Dumpable, Sweeper.Sweepable
{
private static final Logger LOG = LoggerFactory.getLogger(AbstractConnectionPool.class);
@ -50,17 +51,22 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
private boolean maximizeConnections;
protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
}
protected AbstractConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
{
this.destination = destination;
this.requester = requester;
@SuppressWarnings("unchecked")
Pool<Connection> pool = destination.getBean(Pool.class);
if (pool == null)
{
pool = new Pool<>(maxConnections, cache ? 1 : 0);
destination.addBean(pool);
}
this.pool = pool;
addBean(pool);
}
@Override
protected void doStop() throws Exception
{
pool.close();
}
@Override

View File

@ -18,7 +18,9 @@
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@ -27,12 +29,17 @@ public class DuplexConnectionPool extends AbstractConnectionPool
{
public DuplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
this(destination, maxConnections, true, requester);
this(destination, maxConnections, false, requester);
}
public DuplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{
super(destination, maxConnections, cache, requester);
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester);
}
public DuplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester)
{
super(destination, pool, requester);
}
@Override

View File

@ -1,79 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>A {@link MultiplexConnectionPool} that picks connections at a particular
* index between {@code 0} and {@link #getMaxConnectionCount()}.</p>
* <p>The algorithm that decides the index value is decided by subclasses.</p>
* <p>To acquire a connection, this class obtains the index value and attempts
* to activate the pool entry at that index.
* If this activation fails, another attempt to activate an alternative pool
* entry is performed, to avoid stalling connection acquisition if there is
* an available entry at a different index.</p>
*/
@ManagedObject
public abstract class IndexedConnectionPool extends MultiplexConnectionPool
{
private static final Logger LOG = LoggerFactory.getLogger(IndexedConnectionPool.class);
private final Pool<Connection> pool;
public IndexedConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, false, requester, maxMultiplex);
pool = destination.getBean(Pool.class);
}
/**
* <p>Must return an index between 0 (inclusive) and {@code maxConnections} (exclusive)
* used to attempt to acquire the connection at that index in the pool.</p>
*
* @param maxConnections the upper bound of the index (exclusive)
* @return an index between 0 (inclusive) and {@code maxConnections} (exclusive)
*/
protected abstract int getIndex(int maxConnections);
@Override
protected Connection activate()
{
int index = getIndex(getMaxConnectionCount());
Pool<Connection>.Entry entry = pool.acquireAt(index);
if (LOG.isDebugEnabled())
LOG.debug("activating at index={} entry={}", index, entry);
if (entry == null)
{
entry = pool.acquire();
if (LOG.isDebugEnabled())
LOG.debug("activating alternative entry={}", entry);
}
if (entry == null)
return null;
Connection connection = entry.getPooled();
acquired(connection);
return connection;
}
}

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.LeakDetector;
import org.eclipse.jetty.util.component.LifeCycle;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -40,40 +41,16 @@ public class LeakTrackingConnectionPool extends DuplexConnectionPool
public LeakTrackingConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
super((HttpDestination)destination, maxConnections, requester);
start();
}
private void start()
{
try
{
leakDetector.start();
}
catch (Exception x)
{
throw new RuntimeException(x);
}
addBean(leakDetector);
}
@Override
public void close()
{
stop();
LifeCycle.stop(this);
super.close();
}
private void stop()
{
try
{
leakDetector.stop();
}
catch (Exception x)
{
throw new RuntimeException(x);
}
}
@Override
protected void acquired(Connection connection)
{

View File

@ -18,7 +18,9 @@
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@ -27,12 +29,17 @@ public class MultiplexConnectionPool extends AbstractConnectionPool implements C
{
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
this(destination, maxConnections, true, requester, maxMultiplex);
this(destination, maxConnections, false, requester, maxMultiplex);
}
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, cache, requester);
this(destination, new Pool<>(Pool.StrategyType.FIRST, maxConnections, cache), requester, maxMultiplex);
}
public MultiplexConnectionPool(HttpDestination destination, Pool<Connection> pool, Callback requester, int maxMultiplex)
{
super(destination, pool, requester);
setMaxMultiplex(maxMultiplex);
}

View File

@ -18,26 +18,19 @@
package org.eclipse.jetty.client;
import java.util.concurrent.ThreadLocalRandom;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;
/**
* <p>An indexed {@link ConnectionPool} that provides connections
* <p>A {@link ConnectionPool} that provides connections
* randomly among the ones that are available.</p>
*/
@ManagedObject
public class RandomConnectionPool extends IndexedConnectionPool
public class RandomConnectionPool extends MultiplexConnectionPool
{
public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, requester, maxMultiplex);
}
@Override
protected int getIndex(int maxConnections)
{
return ThreadLocalRandom.current().nextInt(maxConnections);
super(destination, new Pool<>(Pool.StrategyType.RANDOM, maxConnections, false), requester, maxMultiplex);
}
}

View File

@ -18,9 +18,8 @@
package org.eclipse.jetty.client;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;
/**
@ -30,7 +29,7 @@ import org.eclipse.jetty.util.annotation.ManagedObject;
* <li>the server takes different times to serve different requests; if a request takes a long
* time to be processed by the server, it would be a performance penalty to stall sending requests
* waiting for that connection to be available - better skip it and try another connection</li>
* <li>connections may be closed by the client or by the server, so it should be a performance
* <li>connections may be closed by the client or by the server, so it would be a performance
* penalty to stall sending requests waiting for a new connection to be opened</li>
* <li>thread scheduling on both client and server may temporarily penalize a connection</li>
* </ul>
@ -48,10 +47,8 @@ import org.eclipse.jetty.util.annotation.ManagedObject;
* @see RandomConnectionPool
*/
@ManagedObject
public class RoundRobinConnectionPool extends IndexedConnectionPool
public class RoundRobinConnectionPool extends MultiplexConnectionPool
{
private final AtomicInteger offset = new AtomicInteger();
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
this(destination, maxConnections, requester, 1);
@ -59,17 +56,11 @@ public class RoundRobinConnectionPool extends IndexedConnectionPool
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, requester, maxMultiplex);
super(destination, new Pool<>(Pool.StrategyType.ROUND_ROBIN, maxConnections, false), requester, maxMultiplex);
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
// those that were closed to process queued requests.
setMaximizeConnections(true);
}
@Override
protected int getIndex(int maxConnections)
{
return offset.getAndUpdate(v -> ++v == maxConnections ? 0 : v);
}
}

View File

@ -572,7 +572,10 @@ public class IdleTimeoutTest extends AbstractTest
}
}
});
connector.setIdleTimeout(2 * delay);
// The timeout is going to be reset each time a DATA frame is fully consumed, hence
// every 2 loops in the above servlet. So the IdleTimeout must be greater than (2 * delay)
// to make sure it does not fire spuriously.
connector.setIdleTimeout(3 * delay);
Session session = newClient(new Session.Listener.Adapter());
MetaData.Request metaData = newRequest("POST", HttpFields.EMPTY);

View File

@ -0,0 +1,139 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.util.concurrent.atomic.LongAdder;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.results.format.ResultFormatType;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
@State(Scope.Benchmark)
public class PoolStrategyBenchmark
{
private Pool<String> pool;
@Param({
"Pool.Linear",
"Pool.Random",
"Pool.RoundRobin",
"Pool.ThreadId",
})
public static String POOL_TYPE;
@Param({
"false",
"true",
})
public static boolean CACHE;
@Param({
"4",
"16"
})
public static int SIZE;
private static final LongAdder misses = new LongAdder();
private static final LongAdder hits = new LongAdder();
private static final LongAdder total = new LongAdder();
@Setup
public void setUp() throws Exception
{
misses.reset();
switch (POOL_TYPE)
{
case "Pool.Linear" :
pool = new Pool<>(Pool.StrategyType.FIRST, SIZE, CACHE);
break;
case "Pool.Random" :
pool = new Pool<>(Pool.StrategyType.RANDOM, SIZE, CACHE);
break;
case "Pool.ThreadId" :
pool = new Pool<>(Pool.StrategyType.THREAD_ID, SIZE, CACHE);
break;
case "Pool.RoundRobin" :
pool = new Pool<>(Pool.StrategyType.ROUND_ROBIN, SIZE, CACHE);
break;
default:
throw new IllegalStateException();
}
for (int i = 0; i < SIZE; i++)
{
pool.reserve(1).enable(Integer.toString(i), false);
}
}
@TearDown
public void tearDown()
{
System.err.printf("%nMISSES = %d (%d%%)%n", misses.longValue(), 100 * misses.longValue() / (hits.longValue() + misses.longValue()));
System.err.printf("AVERAGE = %d%n", total.longValue() / hits.longValue());
pool.close();
pool = null;
}
@Benchmark
public void testAcquireReleasePoolWithStrategy()
{
// Now really benchmark the strategy we are interested in
Pool<String>.Entry entry = pool.acquire();
if (entry == null || entry.isIdle())
{
misses.increment();
Blackhole.consumeCPU(20);
return;
}
// do some work
hits.increment();
total.add(Long.parseLong(entry.getPooled()));
Blackhole.consumeCPU(entry.getPooled().hashCode() % 20);
// release the entry
entry.release();
}
public static void main(String[] args) throws RunnerException
{
Options opt = new OptionsBuilder()
.include(PoolStrategyBenchmark.class.getSimpleName())
.warmupIterations(3)
.measurementIterations(3)
.forks(1)
.threads(8)
.resultFormat(ResultFormatType.JSON)
.result("/tmp/poolStrategy-" + System.currentTimeMillis() + ".json")
// .addProfiler(GCProfiler.class)
.build();
new Runner(opt).run();
}
}

View File

@ -26,26 +26,20 @@ import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A fast pool of objects, with optional support for
* multiplexing, max usage count and thread-local caching.
* <p>
* The thread-local caching mechanism is about remembering up to N previously
* used entries into a thread-local single-threaded collection.
* When that collection is not empty, its entries are removed one by one
* during acquisition until an entry that can be acquired is found.
* This can greatly speed up acquisition when both the acquisition and the
* release of the entries is done on the same thread as this avoids iterating
* the global, thread-safe collection of entries.
* </p>
* multiplexing, max usage count and several optimized strategies plus
* an optional {@link ThreadLocal} cache of the last release entry.
* <p>
* When the method {@link #close()} is called, all {@link Closeable}s in the pool
* are also closed.
@ -56,38 +50,88 @@ public class Pool<T> implements AutoCloseable, Dumpable
{
private static final Logger LOGGER = LoggerFactory.getLogger(Pool.class);
private final List<Entry> sharedList = new CopyOnWriteArrayList<>();
private final List<Entry> entries = new CopyOnWriteArrayList<>();
private final int maxEntries;
private final AtomicInteger pending = new AtomicInteger();
private final StrategyType strategyType;
/*
* The cache is used to avoid hammering on the first index of the entry list.
* Caches can become poisoned (i.e.: containing entries that are in use) when
* the release isn't done by the acquiring thread or when the entry pool is
* undersized compared to the load applied on it.
* When an entry can't be found in the cache, the global list is iterated
* normally so the cache has no visible effect besides performance.
* with the configured strategy so the cache has no visible effect besides performance.
*/
private final ThreadLocal<List<Entry>> cache;
private final AutoLock lock = new AutoLock();
private final int maxEntries;
private final int cacheSize;
private final AtomicInteger pending = new AtomicInteger();
private final ThreadLocal<Entry> cache;
private final AtomicInteger nextIndex;
private volatile boolean closed;
private volatile int maxMultiplex = 1;
private volatile int maxUsageCount = -1;
/**
* Construct a Pool with the specified thread-local cache size.
*
* @param maxEntries the maximum amount of entries that the pool will accept.
* @param cacheSize the thread-local cache size. A value less than 1 means the cache is disabled.
* The type of the strategy to use for the pool.
* The strategy primarily determines where iteration over the pool entries begins.
*/
public Pool(int maxEntries, int cacheSize)
public enum StrategyType
{
/**
* A strategy that looks for an entry always starting from the first entry.
* It will favour the early entries in the pool, but may contend on them more.
*/
FIRST,
/**
* A strategy that looks for an entry by iterating from a random starting
* index. No entries are favoured and contention is reduced.
*/
RANDOM,
/**
* A strategy that uses the {@link Thread#getId()} of the current thread
* to select a starting point for an entry search. Whilst not as performant as
* using the {@link ThreadLocal} cache, it may be suitable when the pool is substantially smaller
* than the number of available threads.
* No entries are favoured and contention is reduced.
*/
THREAD_ID,
/**
* A strategy that looks for an entry by iterating from a starting point
* that is incremented on every search. This gives similar results to the
* random strategy but with more predictable behaviour.
* No entries are favoured and contention is reduced.
*/
ROUND_ROBIN,
}
/**
* Construct a Pool with a specified lookup strategy and no
* {@link ThreadLocal} cache.
*
* @param strategyType The strategy to used for looking up entries.
* @param maxEntries the maximum amount of entries that the pool will accept.
*/
public Pool(StrategyType strategyType, int maxEntries)
{
this(strategyType, maxEntries, false);
}
/**
* Construct a Pool with the specified thread-local cache size and
* an optional {@link ThreadLocal} cache.
* @param strategyType The strategy to used for looking up entries.
* @param maxEntries the maximum amount of entries that the pool will accept.
* @param cache True if a {@link ThreadLocal} cache should be used to try the most recently released entry.
*/
public Pool(StrategyType strategyType, int maxEntries, boolean cache)
{
this.maxEntries = maxEntries;
this.cacheSize = cacheSize;
if (cacheSize > 0)
this.cache = ThreadLocal.withInitial(() -> new ArrayList<Entry>(cacheSize));
else
this.cache = null;
this.strategyType = strategyType;
this.cache = cache ? new ThreadLocal<>() : null;
nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
}
public int getReservedCount()
@ -97,12 +141,12 @@ public class Pool<T> implements AutoCloseable, Dumpable
public int getIdleCount()
{
return (int)sharedList.stream().filter(Entry::isIdle).count();
return (int)entries.stream().filter(Entry::isIdle).count();
}
public int getInUseCount()
{
return (int)sharedList.stream().filter(Entry::isInUse).count();
return (int)entries.stream().filter(Entry::isInUse).count();
}
public int getMaxEntries()
@ -153,7 +197,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
if (closed)
return null;
int space = maxEntries - sharedList.size();
int space = maxEntries - entries.size();
if (space <= 0)
return null;
@ -165,17 +209,18 @@ public class Pool<T> implements AutoCloseable, Dumpable
pending.incrementAndGet();
Entry entry = new Entry();
sharedList.add(entry);
entries.add(entry);
return entry;
}
}
/**
* Acquire the entry from the pool at the specified index. This method bypasses the thread-local mechanism.
*
* @deprecated No longer supported. Instead use a {@link StrategyType} to configure the pool.
* @param idx the index of the entry to acquire.
* @return the specified entry or null if there is none at the specified index or if it is not available.
*/
@Deprecated
public Entry acquireAt(int idx)
{
if (closed)
@ -183,7 +228,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
try
{
Entry entry = sharedList.get(idx);
Entry entry = entries.get(idx);
if (entry.tryAcquire())
return entry;
}
@ -204,27 +249,54 @@ public class Pool<T> implements AutoCloseable, Dumpable
if (closed)
return null;
// first check the thread-local cache
int size = entries.size();
if (size == 0)
return null;
if (cache != null)
{
List<Entry> cachedList = cache.get();
while (!cachedList.isEmpty())
{
Entry cachedEntry = cachedList.remove(cachedList.size() - 1);
if (cachedEntry.tryAcquire())
return cachedEntry;
}
}
// then iterate the shared list
for (Entry entry : sharedList)
{
if (entry.tryAcquire())
Pool<T>.Entry entry = cache.get();
if (entry != null && entry.tryAcquire())
return entry;
}
int index = startIndex(size);
for (int tries = size; tries-- > 0;)
{
try
{
Pool<T>.Entry entry = entries.get(index);
if (entry != null && entry.tryAcquire())
return entry;
}
catch (IndexOutOfBoundsException e)
{
LOGGER.ignore(e);
size = entries.size();
}
index = (index + 1) % size;
}
return null;
}
private int startIndex(int size)
{
switch (strategyType)
{
case FIRST:
return 0;
case RANDOM:
return ThreadLocalRandom.current().nextInt(size);
case ROUND_ROBIN:
return nextIndex.getAndUpdate(c -> Math.max(0, c + 1)) % size;
case THREAD_ID:
return (int)(Thread.currentThread().getId() % size);
default:
throw new IllegalArgumentException("Unknown strategy type: " + strategyType);
}
}
/**
* Utility method to acquire an entry from the pool,
* reserving and creating a new entry if necessary.
@ -278,17 +350,10 @@ public class Pool<T> implements AutoCloseable, Dumpable
if (closed)
return false;
// first mark it as unused
boolean reusable = entry.tryRelease();
// then cache the released entry
if (cache != null && reusable)
{
List<Entry> cachedList = cache.get();
if (cachedList.size() < cacheSize)
cachedList.add(entry);
}
return reusable;
boolean released = entry.tryRelease();
if (released && cache != null)
cache.set(entry);
return released;
}
/**
@ -309,12 +374,9 @@ public class Pool<T> implements AutoCloseable, Dumpable
return false;
}
boolean removed = sharedList.remove(entry);
if (!removed)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("Attempt to remove an object from the pool that does not exist: {}", entry);
}
boolean removed = entries.remove(entry);
if (!removed && LOGGER.isDebugEnabled())
LOGGER.debug("Attempt to remove an object from the pool that does not exist: {}", entry);
return removed;
}
@ -331,8 +393,8 @@ public class Pool<T> implements AutoCloseable, Dumpable
try (AutoLock l = lock.lock())
{
closed = true;
copy = new ArrayList<>(sharedList);
sharedList.clear();
copy = new ArrayList<>(entries);
entries.clear();
}
// iterate the copy and close its entries
@ -345,29 +407,30 @@ public class Pool<T> implements AutoCloseable, Dumpable
public int size()
{
return sharedList.size();
return entries.size();
}
public Collection<Entry> values()
{
return Collections.unmodifiableCollection(sharedList);
return Collections.unmodifiableCollection(entries);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
Dumpable.dumpObjects(out, indent, this);
Dumpable.dumpObjects(out, indent, this,
new DumpableCollection("entries", entries));
}
@Override
public String toString()
{
return String.format("%s@%x[size=%d closed=%s entries=%s]",
return String.format("%s@%x[size=%d closed=%s pending=%d]",
getClass().getSimpleName(),
hashCode(),
sharedList.size(),
entries.size(),
closed,
sharedList);
pending.get());
}
public class Entry
@ -500,6 +563,13 @@ public class Pool<T> implements AutoCloseable, Dumpable
return !(overUsed && newMultiplexingCount == 0);
}
public boolean isOverUsed()
{
int currentMaxUsageCount = maxUsageCount;
int usageCount = state.getHi();
return currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount;
}
/**
* Try to mark the entry as removed.
* @return true if the entry has to be removed from the containing pool, false otherwise.
@ -549,12 +619,17 @@ public class Pool<T> implements AutoCloseable, Dumpable
public String toString()
{
long encoded = state.get();
return String.format("%s@%x{usage=%d/%d,multiplex=%d/%d,pooled=%s}",
int usageCount = AtomicBiInteger.getHi(encoded);
int multiplexCount = AtomicBiInteger.getLo(encoded);
String state = usageCount < 0 ? "CLOSED" : multiplexCount == 0 ? "IDLE" : "INUSE";
return String.format("%s@%x{%s, usage=%d, multiplex=%d/%d, pooled=%s}",
getClass().getSimpleName(),
hashCode(),
AtomicBiInteger.getHi(encoded),
getMaxUsageCount(),
AtomicBiInteger.getLo(encoded),
state,
Math.max(usageCount, 0),
Math.max(multiplexCount, 0),
getMaxMultiplex(),
pooled);
}

View File

@ -21,17 +21,27 @@ package org.eclipse.jetty.util;
import java.io.Closeable;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;
import static java.util.stream.Collectors.toList;
import static org.eclipse.jetty.util.Pool.StrategyType.FIRST;
import static org.eclipse.jetty.util.Pool.StrategyType.RANDOM;
import static org.eclipse.jetty.util.Pool.StrategyType.ROUND_ROBIN;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
@ -40,20 +50,27 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class PoolTest
{
public static Stream<Object[]> cacheSize()
interface Factory
{
Pool<String> getPool(int maxSize);
}
public static Stream<Object[]> strategy()
{
List<Object[]> data = new ArrayList<>();
data.add(new Object[]{0});
data.add(new Object[]{1});
data.add(new Object[]{2});
data.add(new Object[]{(Factory)s -> new Pool<>(FIRST, s)});
data.add(new Object[]{(Factory)s -> new Pool<>(RANDOM, s)});
data.add(new Object[]{(Factory)s -> new Pool<>(FIRST, s, true)});
data.add(new Object[]{(Factory)s -> new Pool<>(ROUND_ROBIN, s)});
return data.stream();
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testAcquireRelease(int cacheSize)
@MethodSource(value = "strategy")
public void testAcquireRelease(Factory factory)
{
Pool<String> pool = new Pool<>(1,cacheSize);
Pool<String> pool = factory.getPool(1);
pool.reserve(-1).enable("aaa", false);
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
@ -94,10 +111,10 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testRemoveBeforeRelease(int cacheSize)
@MethodSource(value = "strategy")
public void testRemoveBeforeRelease(Factory factory)
{
Pool<String> pool = new Pool<>(1, cacheSize);
Pool<String> pool = factory.getPool(1);
pool.reserve(-1).enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
@ -107,10 +124,10 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testCloseBeforeRelease(int cacheSize)
@MethodSource(value = "strategy")
public void testCloseBeforeRelease(Factory factory)
{
Pool<String> pool = new Pool<>(1, cacheSize);
Pool<String> pool = factory.getPool(1);
pool.reserve(-1).enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
@ -121,10 +138,10 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testMaxPoolSize(int cacheSize)
@MethodSource(value = "strategy")
public void testMaxPoolSize(Factory factory)
{
Pool<String> pool = new Pool<>(1, cacheSize);
Pool<String> pool = factory.getPool(1);
assertThat(pool.size(), is(0));
assertThat(pool.reserve(-1), notNullValue());
assertThat(pool.size(), is(1));
@ -133,10 +150,10 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testReserve(int cacheSize)
@MethodSource(value = "strategy")
public void testReserve(Factory factory)
{
Pool<String> pool = new Pool<>(2, cacheSize);
Pool<String> pool = factory.getPool(2);
// Reserve an entry
Pool<String>.Entry e1 = pool.reserve(-1);
@ -196,10 +213,10 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testReserveMaxPending(int cacheSize)
@MethodSource(value = "strategy")
public void testReserveMaxPending(Factory factory)
{
Pool<String> pool = new Pool<>(2, cacheSize);
Pool<String> pool = factory.getPool(2);
assertThat(pool.reserve(0), nullValue());
assertThat(pool.reserve(1), notNullValue());
assertThat(pool.reserve(1), nullValue());
@ -210,20 +227,20 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testReserveNegativeMaxPending(int cacheSize)
@MethodSource(value = "strategy")
public void testReserveNegativeMaxPending(Factory factory)
{
Pool<String> pool = new Pool<>(2, cacheSize);
Pool<String> pool = factory.getPool(2);
assertThat(pool.reserve(-1), notNullValue());
assertThat(pool.reserve(-1), notNullValue());
assertThat(pool.reserve(-1), nullValue());
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testClose(int cacheSize)
@MethodSource(value = "strategy")
public void testClose(Factory factory)
{
Pool<String> pool = new Pool<>(1, cacheSize);
Pool<String> pool = factory.getPool(1);
pool.reserve(-1).enable("aaa", false);
assertThat(pool.isClosed(), is(false));
pool.close();
@ -235,12 +252,11 @@ public class PoolTest
assertThat(pool.reserve(-1), nullValue());
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testClosingCloseable(int cacheSize)
@Test
public void testClosingCloseable()
{
AtomicBoolean closed = new AtomicBoolean();
Pool<Closeable> pool = new Pool<>(1,0);
Pool<Closeable> pool = new Pool<>(FIRST, 1);
Closeable pooled = () -> closed.set(true);
pool.reserve(-1).enable(pooled, false);
assertThat(closed.get(), is(false));
@ -249,10 +265,10 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testRemove(int cacheSize)
@MethodSource(value = "strategy")
public void testRemove(Factory factory)
{
Pool<String> pool = new Pool<>(1, cacheSize);
Pool<String> pool = factory.getPool(1);
pool.reserve(-1).enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
@ -264,10 +280,10 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testValuesSize(int cacheSize)
@MethodSource(value = "strategy")
public void testValuesSize(Factory factory)
{
Pool<String> pool = new Pool<>(2, cacheSize);
Pool<String> pool = factory.getPool(2);
assertThat(pool.size(), is(0));
assertThat(pool.values().isEmpty(), is(true));
@ -278,10 +294,10 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testValuesContainsAcquiredEntries(int cacheSize)
@MethodSource(value = "strategy")
public void testValuesContainsAcquiredEntries(Factory factory)
{
Pool<String> pool = new Pool<>(2, cacheSize);
Pool<String> pool = factory.getPool(2);
pool.reserve(-1).enable("aaa", false);
pool.reserve(-1).enable("bbb", false);
@ -292,10 +308,10 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testAcquireAt(int cacheSize)
@MethodSource(value = "strategy")
public void testAcquireAt(Factory factory)
{
Pool<String> pool = new Pool<>(2, cacheSize);
Pool<String> pool = factory.getPool(2);
pool.reserve(-1).enable("aaa", false);
pool.reserve(-1).enable("bbb", false);
@ -308,10 +324,10 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testMaxUsageCount(int cacheSize)
@MethodSource(value = "strategy")
public void testMaxUsageCount(Factory factory)
{
Pool<String> pool = new Pool<>(1, cacheSize);
Pool<String> pool = factory.getPool(1);
pool.setMaxUsageCount(3);
pool.reserve(-1).enable("aaa", false);
@ -331,39 +347,54 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testMaxMultiplex(int cacheSize)
@MethodSource(value = "strategy")
public void testMaxMultiplex(Factory factory)
{
Pool<String> pool = new Pool<>(2, cacheSize);
Pool<String> pool = factory.getPool(2);
pool.setMaxMultiplex(3);
pool.reserve(-1).enable("aaa", false);
pool.reserve(-1).enable("bbb", false);
Pool<String>.Entry e1 = pool.acquire();
Pool<String>.Entry e2 = pool.acquire();
Pool<String>.Entry e3 = pool.acquire();
Pool<String>.Entry e4 = pool.acquire();
assertThat(e1.getPooled(), equalTo("aaa"));
assertThat(e1, sameInstance(e2));
assertThat(e1, sameInstance(e3));
assertThat(e4.getPooled(), equalTo("bbb"));
assertThat(pool.release(e1), is(true));
Pool<String>.Entry e5 = pool.acquire();
assertThat(e2, sameInstance(e5));
Pool<String>.Entry e6 = pool.acquire();
assertThat(e4, sameInstance(e6));
Map<String, AtomicInteger> counts = new HashMap<>();
AtomicInteger a = new AtomicInteger();
AtomicInteger b = new AtomicInteger();
counts.put("a", a);
counts.put("b", b);
pool.reserve(-1).enable("a", false);
pool.reserve(-1).enable("b", false);
counts.get(pool.acquire().getPooled()).incrementAndGet();
counts.get(pool.acquire().getPooled()).incrementAndGet();
counts.get(pool.acquire().getPooled()).incrementAndGet();
counts.get(pool.acquire().getPooled()).incrementAndGet();
assertThat(a.get(), greaterThan(0));
assertThat(a.get(), lessThanOrEqualTo(3));
assertThat(b.get(), greaterThan(0));
assertThat(b.get(), lessThanOrEqualTo(3));
counts.get(pool.acquire().getPooled()).incrementAndGet();
counts.get(pool.acquire().getPooled()).incrementAndGet();
assertThat(a.get(), is(3));
assertThat(b.get(), is(3));
assertNull(pool.acquire());
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testRemoveMultiplexed(int cacheSize)
@MethodSource(value = "strategy")
public void testRemoveMultiplexed(Factory factory)
{
Pool<String> pool = new Pool<>(1, cacheSize);
Pool<String> pool = factory.getPool(1);
pool.setMaxMultiplex(2);
pool.reserve(-1).enable("aaa", false);
Pool<String>.Entry e1 = pool.acquire();
assertThat(e1, notNullValue());
Pool<String>.Entry e2 = pool.acquire();
assertThat(e2, notNullValue());
assertThat(e2, sameInstance(e1));
assertThat(e2.getUsageCount(), is(2));
assertThat(pool.values().stream().findFirst().get().isIdle(), is(false));
assertThat(pool.remove(e1), is(false));
@ -380,10 +411,10 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testMultiplexRemoveThenAcquireThenReleaseRemove(int cacheSize)
@MethodSource(value = "strategy")
public void testMultiplexRemoveThenAcquireThenReleaseRemove(Factory factory)
{
Pool<String> pool = new Pool<>(1, cacheSize);
Pool<String> pool = factory.getPool(1);
pool.setMaxMultiplex(2);
pool.reserve(-1).enable("aaa", false);
@ -398,10 +429,10 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testNonMultiplexRemoveAfterAcquire(int cacheSize)
@MethodSource(value = "strategy")
public void testNonMultiplexRemoveAfterAcquire(Factory factory)
{
Pool<String> pool = new Pool<>(1, cacheSize);
Pool<String> pool = factory.getPool(1);
pool.setMaxMultiplex(2);
pool.reserve(-1).enable("aaa", false);
@ -411,10 +442,10 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testMultiplexRemoveAfterAcquire(int cacheSize)
@MethodSource(value = "strategy")
public void testMultiplexRemoveAfterAcquire(Factory factory)
{
Pool<String> pool = new Pool<>(1, cacheSize);
Pool<String> pool = factory.getPool(1);
pool.setMaxMultiplex(2);
pool.reserve(-1).enable("aaa", false);
@ -436,10 +467,10 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testReleaseThenRemoveNonEnabledEntry(int cacheSize)
@MethodSource(value = "strategy")
public void testReleaseThenRemoveNonEnabledEntry(Factory factory)
{
Pool<String> pool = new Pool<>(1, cacheSize);
Pool<String> pool = factory.getPool(1);
Pool<String>.Entry e = pool.reserve(-1);
assertThat(pool.size(), is(1));
assertThat(pool.release(e), is(false));
@ -449,10 +480,10 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testRemoveNonEnabledEntry(int cacheSize)
@MethodSource(value = "strategy")
public void testRemoveNonEnabledEntry(Factory factory)
{
Pool<String> pool = new Pool<>(1, cacheSize);
Pool<String> pool = factory.getPool(1);
Pool<String>.Entry e = pool.reserve(-1);
assertThat(pool.size(), is(1));
assertThat(pool.remove(e), is(true));
@ -460,10 +491,10 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testMultiplexMaxUsageReachedAcquireThenRemove(int cacheSize)
@MethodSource(value = "strategy")
public void testMultiplexMaxUsageReachedAcquireThenRemove(Factory factory)
{
Pool<String> pool = new Pool<>(1, cacheSize);
Pool<String> pool = factory.getPool(1);
pool.setMaxMultiplex(2);
pool.setMaxUsageCount(3);
pool.reserve(-1).enable("aaa", false);
@ -479,12 +510,12 @@ public class PoolTest
assertThat(pool.remove(e0), is(true));
assertThat(pool.size(), is(0));
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove(int cacheSize)
@MethodSource(value = "strategy")
public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove(Factory factory)
{
Pool<String> pool = new Pool<>(1, cacheSize);
Pool<String> pool = factory.getPool(1);
pool.setMaxMultiplex(2);
pool.setMaxUsageCount(3);
pool.reserve(-1).enable("aaa", false);
@ -506,10 +537,10 @@ public class PoolTest
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testUsageCountAfterReachingMaxMultiplexLimit(int cacheSize)
@MethodSource(value = "strategy")
public void testUsageCountAfterReachingMaxMultiplexLimit(Factory factory)
{
Pool<String> pool = new Pool<>(1, cacheSize);
Pool<String> pool = factory.getPool(1);
pool.setMaxMultiplex(2);
pool.setMaxUsageCount(10);
pool.reserve(-1).enable("aaa", false);
@ -517,25 +548,25 @@ public class PoolTest
Pool<String>.Entry e1 = pool.acquire();
assertThat(e1.getUsageCount(), is(1));
Pool<String>.Entry e2 = pool.acquire();
assertThat(e2, sameInstance(e1));
assertThat(e1.getUsageCount(), is(2));
assertThat(pool.acquire(), nullValue());
assertThat(e1.getUsageCount(), is(2));
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testConfigLimits(int cacheSize)
@Test
public void testConfigLimits()
{
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(1, 0).setMaxMultiplex(0));
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(1, 0).setMaxMultiplex(-1));
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(1, 0).setMaxUsageCount(0));
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(FIRST, 1).setMaxMultiplex(0));
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(FIRST, 1).setMaxMultiplex(-1));
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(FIRST, 1).setMaxUsageCount(0));
}
@ParameterizedTest
@MethodSource(value = "cacheSize")
public void testAcquireWithCreator(int cacheSize)
@MethodSource(value = "strategy")
public void testAcquireWithCreator(Factory factory)
{
Pool<String> pool = new Pool<>(2, cacheSize);
Pool<String> pool = factory.getPool(2);
assertThat(pool.size(), is(0));
assertThat(pool.acquire(e -> null), nullValue());
@ -590,6 +621,67 @@ public class PoolTest
assertThat(pool.size(), is(1));
assertThat(pool.getReservedCount(), is(0));
assertThat(pool.getInUseCount(), is(1));
}
@Test
public void testRoundRobinStrategy()
{
Pool<AtomicInteger> pool = new Pool<>(ROUND_ROBIN, 4);
Pool<AtomicInteger>.Entry e1 = pool.acquire(e -> new AtomicInteger());
Pool<AtomicInteger>.Entry e2 = pool.acquire(e -> new AtomicInteger());
Pool<AtomicInteger>.Entry e3 = pool.acquire(e -> new AtomicInteger());
Pool<AtomicInteger>.Entry e4 = pool.acquire(e -> new AtomicInteger());
assertNull(pool.acquire(e -> new AtomicInteger()));
pool.release(e1);
pool.release(e2);
pool.release(e3);
pool.release(e4);
Pool<AtomicInteger>.Entry last = null;
for (int i = 0; i < 8; i++)
{
Pool<AtomicInteger>.Entry e = pool.acquire();
if (last != null)
assertThat(e, not(sameInstance(last)));
e.getPooled().incrementAndGet();
pool.release(e);
last = e;
}
assertThat(e1.getPooled().get(), is(2));
assertThat(e2.getPooled().get(), is(2));
assertThat(e3.getPooled().get(), is(2));
assertThat(e4.getPooled().get(), is(2));
}
@Test
public void testRandomStrategy()
{
Pool<AtomicInteger> pool = new Pool<>(RANDOM, 4);
Pool<AtomicInteger>.Entry e1 = pool.acquire(e -> new AtomicInteger());
Pool<AtomicInteger>.Entry e2 = pool.acquire(e -> new AtomicInteger());
Pool<AtomicInteger>.Entry e3 = pool.acquire(e -> new AtomicInteger());
Pool<AtomicInteger>.Entry e4 = pool.acquire(e -> new AtomicInteger());
assertNull(pool.acquire(e -> new AtomicInteger()));
pool.release(e1);
pool.release(e2);
pool.release(e3);
pool.release(e4);
for (int i = 0; i < 400; i++)
{
Pool<AtomicInteger>.Entry e = pool.acquire();
e.getPooled().incrementAndGet();
pool.release(e);
}
assertThat(e1.getPooled().get(), greaterThan(10));
assertThat(e2.getPooled().get(), greaterThan(10));
assertThat(e3.getPooled().get(), greaterThan(10));
assertThat(e4.getPooled().get(), greaterThan(10));
}
}

View File

@ -104,7 +104,7 @@ public class XmlConfiguration
.flatMap(p -> Stream.of(p.get()))
.collect(Collectors.toList());
private static final Pool<ConfigurationParser> __parsers =
new Pool<>(Math.min(8, Runtime.getRuntime().availableProcessors()),1);
new Pool<>(Pool.StrategyType.THREAD_ID, Math.min(8, Runtime.getRuntime().availableProcessors()));
public static final Comparator<Executable> EXECUTABLE_COMPARATOR = (e1, e2) ->
{
// Favour methods with less parameters