Merged branch 'jetty-9.4.x' into 'jetty-10.0.x'.

This commit is contained in:
Simone Bordet 2020-07-31 17:47:37 +02:00
commit 2fe01626a5
29 changed files with 1755 additions and 1106 deletions

View File

@ -18,74 +18,121 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.Queue;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.thread.Sweeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static java.util.stream.Collectors.toCollection;
@ManagedObject
public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable, Sweeper.Sweepable
{
private static final Logger LOG = LoggerFactory.getLogger(AbstractConnectionPool.class);
/**
* The connectionCount encodes both the total connections plus the pending connection counts, so both can be atomically changed.
* The bottom 32 bits represent the total connections and the top 32 bits represent the pending connections.
*/
private final AtomicBiInteger connections = new AtomicBiInteger();
private final AtomicBoolean closed = new AtomicBoolean();
private final HttpDestination destination;
private final int maxConnections;
private final Callback requester;
private final Pool<Connection> pool;
protected AbstractConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{
this.destination = destination;
this.maxConnections = maxConnections;
this.requester = requester;
@SuppressWarnings("unchecked")
Pool<Connection> pool = destination.getBean(Pool.class);
if (pool == null)
{
pool = new Pool<>(maxConnections, cache ? 1 : 0);
destination.addBean(pool);
}
this.pool = pool;
}
protected HttpDestination getHttpDestination()
@Override
public CompletableFuture<Void> preCreateConnections(int connectionCount)
{
return destination;
CompletableFuture<?>[] futures = new CompletableFuture[connectionCount];
for (int i = 0; i < connectionCount; i++)
{
futures[i] = tryCreateReturningFuture(pool.getMaxEntries());
}
return CompletableFuture.allOf(futures);
}
protected int getMaxMultiplex()
{
return pool.getMaxMultiplex();
}
protected void setMaxMultiplex(int maxMultiplex)
{
pool.setMaxMultiplex(maxMultiplex);
}
protected int getMaxUsageCount()
{
return pool.getMaxUsageCount();
}
protected void setMaxUsageCount(int maxUsageCount)
{
pool.setMaxUsageCount(maxUsageCount);
}
@ManagedAttribute(value = "The number of active connections", readonly = true)
public int getActiveConnectionCount()
{
return pool.getInUseConnectionCount();
}
@ManagedAttribute(value = "The number of idle connections", readonly = true)
public int getIdleConnectionCount()
{
return pool.getIdleConnectionCount();
}
@ManagedAttribute(value = "The max number of connections", readonly = true)
public int getMaxConnectionCount()
{
return maxConnections;
return pool.getMaxEntries();
}
@ManagedAttribute(value = "The number of connections", readonly = true)
public int getConnectionCount()
{
return connections.getLo();
return pool.size();
}
@ManagedAttribute(value = "The number of pending connections", readonly = true)
public int getPendingConnectionCount()
{
return connections.getHi();
return pool.getPendingConnectionCount();
}
@Override
public boolean isEmpty()
{
return connections.getLo() == 0;
return pool.size() == 0;
}
@Override
public boolean isClosed()
{
return closed.get();
return pool.isClosed();
}
@Override
@ -112,101 +159,175 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
*/
protected void tryCreate(int maxPending)
{
while (true)
tryCreateReturningFuture(maxPending);
}
private CompletableFuture<Void> tryCreateReturningFuture(int maxPending)
{
CompletableFuture<Void> future = new CompletableFuture<>();
if (LOG.isDebugEnabled())
LOG.debug("tryCreate {}/{} connections {}/{} pending", pool.size(), pool.getMaxEntries(), getPendingConnectionCount(), maxPending);
Pool<Connection>.Entry entry = pool.reserve(maxPending);
if (entry == null)
{
long encoded = connections.get();
int pending = AtomicBiInteger.getHi(encoded);
int total = AtomicBiInteger.getLo(encoded);
future.complete(null);
return future;
}
if (LOG.isDebugEnabled())
LOG.debug("tryCreate {}/{} connections {}/{} pending", total, maxConnections, pending, maxPending);
if (LOG.isDebugEnabled())
LOG.debug("newConnection {}/{} connections {}/{} pending", pool.size(), pool.getMaxEntries(), getPendingConnectionCount(), maxPending);
if (total >= maxConnections)
return;
if (maxPending >= 0 && pending >= maxPending)
return;
if (connections.compareAndSet(encoded, pending + 1, total + 1))
destination.newConnection(new Promise<>()
{
@Override
public void succeeded(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("newConnection {}/{} connections {}/{} pending", total + 1, maxConnections, pending + 1, maxPending);
destination.newConnection(new Promise<>()
{
@Override
public void succeeded(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation succeeded {}", total + 1, maxConnections, connection);
connections.add(-1, 0);
onCreated(connection);
proceed();
}
@Override
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection " + (total + 1) + "/" + maxConnections + " creation failed", x);
connections.add(-1, -1);
requester.failed(x);
}
});
return;
LOG.debug("Connection {}/{} creation succeeded {}", pool.size(), pool.getMaxEntries(), connection);
adopt(entry, connection);
future.complete(null);
proceed();
}
}
@Override
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection " + pool.size() + "/" + pool.getMaxEntries() + " creation failed", x);
pool.remove(entry);
future.completeExceptionally(x);
requester.failed(x);
}
});
return future;
}
@Override
public boolean accept(Connection connection)
{
while (true)
{
int count = connections.getLo();
if (count >= maxConnections)
return false;
if (connections.compareAndSetLo(count, count + 1))
return true;
}
Pool<Connection>.Entry entry = pool.reserve(-1);
if (entry == null)
return false;
adopt(entry, connection);
return true;
}
protected abstract void onCreated(Connection connection);
protected void proceed()
{
requester.succeeded();
}
protected abstract Connection activate();
protected Connection active(Connection connection)
private void adopt(Pool<Connection>.Entry entry, Connection connection)
{
if (!(connection instanceof Attachable))
throw new IllegalArgumentException("Invalid connection object: " + connection);
Attachable attachable = (Attachable)connection;
attachable.setAttachment(entry);
if (LOG.isDebugEnabled())
LOG.debug("Connection active {}", connection);
acquired(connection);
return connection;
LOG.debug("onCreating {}", entry);
onCreated(connection);
entry.enable(connection);
idle(connection, false);
}
protected void acquired(Connection connection)
protected Connection activate()
{
Pool<Connection>.Entry entry = pool.acquire();
if (LOG.isDebugEnabled())
LOG.debug("activated '{}'", entry);
if (entry != null)
{
Connection connection = entry.getPooled();
acquired(connection);
return connection;
}
return null;
}
@Override
public boolean isActive(Connection connection)
{
if (!(connection instanceof Attachable))
throw new IllegalArgumentException("Invalid connection object: " + connection);
Attachable attachable = (Attachable)connection;
@SuppressWarnings("unchecked")
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
if (entry == null)
return false;
if (LOG.isDebugEnabled())
LOG.debug("isActive {}", entry);
return !entry.isIdle();
}
@Override
public boolean release(Connection connection)
{
if (!deactivate(connection))
return false;
released(connection);
return idle(connection, isClosed());
}
protected boolean deactivate(Connection connection)
{
if (!(connection instanceof Attachable))
throw new IllegalArgumentException("Invalid connection object: " + connection);
Attachable attachable = (Attachable)connection;
@SuppressWarnings("unchecked")
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
if (entry == null)
return true;
if (LOG.isDebugEnabled())
LOG.debug("releasing {}", entry);
boolean reusable = pool.release(entry);
if (!reusable)
{
remove(connection);
return false;
}
return true;
}
@Override
public boolean remove(Connection connection)
{
return remove(connection, false);
}
protected boolean remove(Connection connection, boolean force)
{
if (!(connection instanceof Attachable))
throw new IllegalArgumentException("Invalid connection object: " + connection);
Attachable attachable = (Attachable)connection;
@SuppressWarnings("unchecked")
Pool<Connection>.Entry entry = (Pool<Connection>.Entry)attachable.getAttachment();
if (entry == null)
return false;
attachable.setAttachment(null);
if (LOG.isDebugEnabled())
LOG.debug("removing {}", entry);
boolean removed = pool.remove(entry);
if (removed || force)
{
released(connection);
removed(connection);
}
return removed;
}
protected void onCreated(Connection connection)
{
}
protected boolean idle(Connection connection, boolean close)
{
if (close)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection idle close {}", connection);
return false;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Connection idle {}", connection);
return true;
}
return !close;
}
protected void acquired(Connection connection)
{
}
protected void released(Connection connection)
@ -215,28 +336,68 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
protected void removed(Connection connection)
{
int pooled = connections.addAndGetLo(-1);
if (LOG.isDebugEnabled())
LOG.debug("Connection removed {} - pooled: {}", connection, pooled);
}
Queue<Connection> getIdleConnections()
{
return pool.values().stream()
.filter(Pool.Entry::isIdle)
.filter(entry -> !entry.isClosed())
.map(Pool.Entry::getPooled)
.collect(toCollection(ArrayDeque::new));
}
Collection<Connection> getActiveConnections()
{
return pool.values().stream()
.filter(entry -> !entry.isIdle())
.filter(entry -> !entry.isClosed())
.map(Pool.Entry::getPooled)
.collect(Collectors.toList());
}
@Override
public void close()
{
if (closed.compareAndSet(false, true))
{
connections.set(0, 0);
}
}
protected void close(Collection<Connection> connections)
{
connections.forEach(Connection::close);
pool.close();
}
@Override
public String dump()
public void dump(Appendable out, String indent) throws IOException
{
return Dumpable.dump(this);
Dumpable.dumpObjects(out, indent, this);
}
@Override
public boolean sweep()
{
pool.values().stream().filter(entry -> entry.getPooled() instanceof Sweeper.Sweepable).forEach(entry ->
{
Connection connection = entry.getPooled();
if (((Sweeper.Sweepable)connection).sweep())
{
boolean removed = remove(connection);
LOG.warn("Connection swept: {}{}{} from active connections{}{}",
connection,
System.lineSeparator(),
removed ? "Removed" : "Not removed",
System.lineSeparator(),
dump());
}
});
return false;
}
@Override
public String toString()
{
return String.format("%s@%x[c=%d/%d/%d,a=%d,i=%d]",
getClass().getSimpleName(),
hashCode(),
getPendingConnectionCount(),
getConnectionCount(),
getMaxConnectionCount(),
getActiveConnectionCount(),
getIdleConnectionCount());
}
}

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.client;
import java.io.Closeable;
import java.util.concurrent.CompletableFuture;
import org.eclipse.jetty.client.api.Connection;
@ -27,6 +28,16 @@ import org.eclipse.jetty.client.api.Connection;
*/
public interface ConnectionPool extends Closeable
{
/**
* Optionally pre-create up to <code>connectionCount</code>
* connections so they are immediately ready for use.
* @param connectionCount the number of connections to pre-start.
*/
default CompletableFuture<Void> preCreateConnections(int connectionCount)
{
return CompletableFuture.completedFuture(null);
}
/**
* @param connection the connection to test
* @return whether the given connection is currently in use

View File

@ -18,303 +18,33 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.thread.Sweeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ManagedObject
public class DuplexConnectionPool extends AbstractConnectionPool implements Sweeper.Sweepable
public class DuplexConnectionPool extends AbstractConnectionPool
{
private static final Logger LOG = LoggerFactory.getLogger(DuplexConnectionPool.class);
private final ReentrantLock lock = new ReentrantLock();
private final Deque<Connection> idleConnections;
private final Set<Connection> activeConnections;
public DuplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
super(destination, maxConnections, requester);
this.idleConnections = new ArrayDeque<>(maxConnections);
this.activeConnections = new HashSet<>(maxConnections);
this(destination, maxConnections, true, requester);
}
protected void lock()
public DuplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{
lock.lock();
}
protected void unlock()
{
lock.unlock();
}
@ManagedAttribute(value = "The number of idle connections", readonly = true)
public int getIdleConnectionCount()
{
lock();
try
{
return idleConnections.size();
}
finally
{
unlock();
}
}
@ManagedAttribute(value = "The number of active connections", readonly = true)
public int getActiveConnectionCount()
{
lock();
try
{
return activeConnections.size();
}
finally
{
unlock();
}
}
public Queue<Connection> getIdleConnections()
{
return idleConnections;
}
public Collection<Connection> getActiveConnections()
{
return activeConnections;
super(destination, maxConnections, cache, requester);
}
@Override
public boolean isActive(Connection connection)
@ManagedAttribute(value = "The maximum amount of times a connection is used before it gets closed")
public int getMaxUsageCount()
{
lock();
try
{
return activeConnections.contains(connection);
}
finally
{
unlock();
}
return super.getMaxUsageCount();
}
@Override
protected void onCreated(Connection connection)
public void setMaxUsageCount(int maxUsageCount)
{
lock();
try
{
// Use "cold" new connections as last.
idleConnections.offer(connection);
}
finally
{
unlock();
}
idle(connection, false);
}
@Override
protected Connection activate()
{
Connection connection;
lock();
try
{
connection = idleConnections.poll();
if (connection == null)
return null;
activeConnections.add(connection);
}
finally
{
unlock();
}
return active(connection);
}
@Override
public boolean release(Connection connection)
{
boolean closed = isClosed();
lock();
try
{
if (!activeConnections.remove(connection))
return false;
if (!closed)
{
// Make sure we use "hot" connections first.
deactivate(connection);
}
}
finally
{
unlock();
}
released(connection);
return idle(connection, closed);
}
protected boolean deactivate(Connection connection)
{
return idleConnections.offerFirst(connection);
}
@Override
public boolean remove(Connection connection)
{
return remove(connection, false);
}
protected boolean remove(Connection connection, boolean force)
{
boolean activeRemoved;
boolean idleRemoved;
lock();
try
{
activeRemoved = activeConnections.remove(connection);
idleRemoved = idleConnections.remove(connection);
}
finally
{
unlock();
}
if (activeRemoved || force)
released(connection);
boolean removed = activeRemoved || idleRemoved || force;
if (removed)
removed(connection);
return removed;
}
@Override
public void close()
{
super.close();
List<Connection> connections = new ArrayList<>();
lock();
try
{
connections.addAll(idleConnections);
idleConnections.clear();
connections.addAll(activeConnections);
activeConnections.clear();
}
finally
{
unlock();
}
close(connections);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
DumpableCollection active;
DumpableCollection idle;
lock();
try
{
active = new DumpableCollection("active", new ArrayList<>(activeConnections));
idle = new DumpableCollection("idle", new ArrayList<>(idleConnections));
}
finally
{
unlock();
}
dump(out, indent, active, idle);
}
protected void dump(Appendable out, String indent, Object... items) throws IOException
{
Dumpable.dumpObjects(out, indent, this, items);
}
@Override
public boolean sweep()
{
List<Connection> toSweep;
lock();
try
{
toSweep = activeConnections.stream()
.filter(connection -> connection instanceof Sweeper.Sweepable)
.collect(Collectors.toList());
}
finally
{
unlock();
}
for (Connection connection : toSweep)
{
if (((Sweeper.Sweepable)connection).sweep())
{
boolean removed = remove(connection, true);
LOG.warn("Connection swept: {}{}{} from active connections{}{}",
connection,
System.lineSeparator(),
removed ? "Removed" : "Not removed",
System.lineSeparator(),
dump());
}
}
return false;
}
@Override
public String toString()
{
int activeSize;
int idleSize;
lock();
try
{
activeSize = activeConnections.size();
idleSize = idleConnections.size();
}
finally
{
unlock();
}
return String.format("%s@%x[c=%d/%d/%d,a=%d,i=%d]",
getClass().getSimpleName(),
hashCode(),
getPendingConnectionCount(),
getConnectionCount(),
getMaxConnectionCount(),
activeSize,
idleSize);
super.setMaxUsageCount(maxUsageCount);
}
}

View File

@ -35,15 +35,17 @@ import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.HttpCookieStore;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public abstract class HttpConnection implements IConnection
public abstract class HttpConnection implements IConnection, Attachable
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnection.class);
private final HttpDestination destination;
private Object attachment;
private int idleTimeoutGuard;
private long idleTimeoutStamp;
@ -271,6 +273,18 @@ public abstract class HttpConnection implements IConnection
}
}
@Override
public void setAttachment(Object obj)
{
this.attachment = obj;
}
@Override
public Object getAttachment()
{
return attachment;
}
@Override
public String toString()
{

View File

@ -427,6 +427,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
{
if (connectionPool.isActive(connection))
{
// trigger the next request after releasing the connection
if (connectionPool.release(connection))
send(false);
else

View File

@ -37,6 +37,7 @@ import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ClientConnector;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.slf4j.Logger;
@ -276,11 +277,12 @@ public class HttpProxy extends ProxyConfiguration.Proxy
}
}
private static class ProxyConnection implements Connection
private static class ProxyConnection implements Connection, Attachable
{
private final Destination destination;
private final Connection connection;
private final Promise<Connection> promise;
private Object attachment;
private ProxyConnection(Destination destination, Connection connection, Promise<Connection> promise)
{
@ -313,6 +315,18 @@ public class HttpProxy extends ProxyConfiguration.Proxy
{
return connection.isClosed();
}
@Override
public void setAttachment(Object obj)
{
this.attachment = obj;
}
@Override
public Object getAttachment()
{
return attachment;
}
}
private static class TunnelPromise implements Promise<Connection>

View File

@ -39,7 +39,7 @@ public class LeakTrackingConnectionPool extends DuplexConnectionPool
public LeakTrackingConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
super(destination, maxConnections, requester);
super((HttpDestination)destination, maxConnections, requester);
start();
}

View File

@ -18,307 +18,47 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.thread.Sweeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
public class MultiplexConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable, Sweeper.Sweepable
@ManagedObject
public class MultiplexConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable
{
private static final Logger LOG = LoggerFactory.getLogger(MultiplexConnectionPool.class);
private final Deque<Holder> idleConnections;
private final Map<Connection, Holder> activeConnections;
private int maxMultiplex;
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, requester);
this.idleConnections = new ArrayDeque<>(maxConnections);
this.activeConnections = new LinkedHashMap<>(maxConnections);
this.maxMultiplex = maxMultiplex;
}
@Override
public Connection acquire(boolean create)
{
Connection connection = activate();
if (connection == null && create)
{
int queuedRequests = getHttpDestination().getQueuedRequestCount();
int maxMultiplex = getMaxMultiplex();
int maxPending = ceilDiv(queuedRequests, maxMultiplex);
tryCreate(maxPending);
connection = activate();
}
return connection;
}
/**
* @param a the dividend
* @param b the divisor
* @return the ceiling of the algebraic quotient
*/
private static int ceilDiv(int a, int b)
{
return (a + b - 1) / b;
this(destination, maxConnections, true, requester, maxMultiplex);
}
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, cache, requester);
setMaxMultiplex(maxMultiplex);
}
@Override
@ManagedAttribute(value = "The multiplexing factor of connections")
public int getMaxMultiplex()
{
synchronized (this)
{
return maxMultiplex;
}
return super.getMaxMultiplex();
}
@Override
public void setMaxMultiplex(int maxMultiplex)
{
synchronized (this)
{
this.maxMultiplex = maxMultiplex;
}
super.setMaxMultiplex(maxMultiplex);
}
@Override
public boolean accept(Connection connection)
@ManagedAttribute(value = "The maximum amount of times a connection is used before it gets closed")
public int getMaxUsageCount()
{
boolean accepted = super.accept(connection);
if (LOG.isDebugEnabled())
LOG.debug("Accepted {} {}", accepted, connection);
if (accepted)
{
synchronized (this)
{
Holder holder = new Holder(connection);
activeConnections.put(connection, holder);
++holder.count;
}
active(connection);
}
return accepted;
return super.getMaxUsageCount();
}
@Override
public boolean isActive(Connection connection)
public void setMaxUsageCount(int maxUsageCount)
{
synchronized (this)
{
return activeConnections.containsKey(connection);
}
}
@Override
protected void onCreated(Connection connection)
{
synchronized (this)
{
// Use "cold" connections as last.
idleConnections.offer(new Holder(connection));
}
idle(connection, false);
}
@Override
protected Connection activate()
{
Holder result = null;
synchronized (this)
{
for (Holder holder : activeConnections.values())
{
if (holder.count < maxMultiplex)
{
result = holder;
break;
}
}
if (result == null)
{
Holder holder = idleConnections.poll();
if (holder == null)
return null;
activeConnections.put(holder.connection, holder);
result = holder;
}
++result.count;
}
return active(result.connection);
}
@Override
public boolean release(Connection connection)
{
boolean closed = isClosed();
boolean idle = false;
Holder holder;
synchronized (this)
{
holder = activeConnections.get(connection);
if (holder != null)
{
int count = --holder.count;
if (count == 0)
{
activeConnections.remove(connection);
if (!closed)
{
idleConnections.offerFirst(holder);
idle = true;
}
}
}
}
if (holder == null)
return false;
released(connection);
if (idle || closed)
return idle(connection, closed);
return true;
}
@Override
public boolean remove(Connection connection)
{
return remove(connection, false);
}
protected boolean remove(Connection connection, boolean force)
{
boolean activeRemoved = true;
boolean idleRemoved = false;
synchronized (this)
{
Holder holder = activeConnections.remove(connection);
if (holder == null)
{
activeRemoved = false;
for (Iterator<Holder> iterator = idleConnections.iterator(); iterator.hasNext(); )
{
holder = iterator.next();
if (holder.connection == connection)
{
idleRemoved = true;
iterator.remove();
break;
}
}
}
}
if (activeRemoved || force)
released(connection);
boolean removed = activeRemoved || idleRemoved || force;
if (removed)
removed(connection);
return removed;
}
@Override
public void close()
{
super.close();
List<Connection> connections;
synchronized (this)
{
connections = idleConnections.stream().map(holder -> holder.connection).collect(Collectors.toList());
connections.addAll(activeConnections.keySet());
}
close(connections);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
DumpableCollection active;
DumpableCollection idle;
synchronized (this)
{
active = new DumpableCollection("active", new ArrayList<>(activeConnections.values()));
idle = new DumpableCollection("idle", new ArrayList<>(idleConnections));
}
Dumpable.dumpObjects(out, indent, this, active, idle);
}
@Override
public boolean sweep()
{
List<Connection> toSweep = new ArrayList<>();
synchronized (this)
{
activeConnections.values().stream()
.map(holder -> holder.connection)
.filter(connection -> connection instanceof Sweeper.Sweepable)
.collect(Collectors.toCollection(() -> toSweep));
}
for (Connection connection : toSweep)
{
if (((Sweeper.Sweepable)connection).sweep())
{
boolean removed = remove(connection, true);
LOG.warn("Connection swept: {}{}{} from active connections{}{}",
connection,
System.lineSeparator(),
removed ? "Removed" : "Not removed",
System.lineSeparator(),
dump());
}
}
return false;
}
@Override
public String toString()
{
int activeSize;
int idleSize;
synchronized (this)
{
activeSize = activeConnections.size();
idleSize = idleConnections.size();
}
return String.format("%s@%x[connections=%d/%d/%d,multiplex=%d,active=%d,idle=%d]",
getClass().getSimpleName(),
hashCode(),
getPendingConnectionCount(),
getConnectionCount(),
getMaxConnectionCount(),
getMaxMultiplex(),
activeSize,
idleSize);
}
private static class Holder
{
private final Connection connection;
private int count;
private Holder(Connection connection)
{
this.connection = connection;
}
@Override
public String toString()
{
return String.format("%s[%d]", connection, count);
}
super.setMaxUsageCount(maxUsageCount);
}
}

View File

@ -18,21 +18,22 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.Dumpable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ManagedObject
public class RoundRobinConnectionPool extends AbstractConnectionPool implements ConnectionPool.Multiplexable
public class RoundRobinConnectionPool extends MultiplexConnectionPool
{
private final List<Entry> entries;
private int maxMultiplex;
private int index;
private static final Logger LOG = LoggerFactory.getLogger(RoundRobinConnectionPool.class);
private final AtomicInteger offset = new AtomicInteger();
private final Pool<Connection> pool;
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
@ -41,220 +42,31 @@ public class RoundRobinConnectionPool extends AbstractConnectionPool implements
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, requester);
entries = new ArrayList<>(maxConnections);
for (int i = 0; i < maxConnections; ++i)
{
entries.add(new Entry());
}
this.maxMultiplex = maxMultiplex;
}
@Override
public int getMaxMultiplex()
{
synchronized (this)
{
return maxMultiplex;
}
}
@Override
public void setMaxMultiplex(int maxMultiplex)
{
synchronized (this)
{
this.maxMultiplex = maxMultiplex;
}
}
/**
* <p>Returns an idle connection, if available, following a round robin algorithm;
* otherwise it always tries to create a new connection, up until the max connection count.</p>
*
* @param create this parameter is ignored and assumed to be always {@code true}
* @return an idle connection or {@code null} if no idle connections are available
*/
@Override
public Connection acquire(boolean create)
{
// The nature of this connection pool is such that a
// connection must always be present in the next slot.
return super.acquire(true);
}
@Override
protected void onCreated(Connection connection)
{
synchronized (this)
{
for (Entry entry : entries)
{
if (entry.connection == null)
{
entry.connection = connection;
break;
}
}
}
idle(connection, false);
super(destination, maxConnections, false, requester, maxMultiplex);
pool = destination.getBean(Pool.class);
}
@Override
protected Connection activate()
{
Connection connection = null;
synchronized (this)
{
int offset = 0;
int capacity = getMaxConnectionCount();
while (offset < capacity)
{
int idx = index + offset;
if (idx >= capacity)
idx -= capacity;
Entry entry = entries.get(idx);
if (entry.connection == null)
break;
if (entry.active < getMaxMultiplex())
{
++entry.active;
++entry.used;
connection = entry.connection;
index += offset + 1;
if (index >= capacity)
index -= capacity;
break;
}
++offset;
}
}
return connection == null ? null : active(connection);
int offset = this.offset.get();
Connection connection = activate(offset);
if (connection != null)
this.offset.getAndIncrement();
return connection;
}
@Override
public boolean isActive(Connection connection)
private Connection activate(int offset)
{
synchronized (this)
Pool<Connection>.Entry entry = pool.acquireAt(Math.abs(offset % pool.getMaxEntries()));
if (LOG.isDebugEnabled())
LOG.debug("activated '{}'", entry);
if (entry != null)
{
for (Entry entry : entries)
{
if (entry.connection == connection)
return entry.active > 0;
}
return false;
}
}
@Override
public boolean release(Connection connection)
{
boolean found = false;
boolean idle = false;
synchronized (this)
{
for (Entry entry : entries)
{
if (entry.connection == connection)
{
found = true;
int active = --entry.active;
idle = active == 0;
break;
}
}
}
if (!found)
return false;
released(connection);
if (idle)
return idle(connection, isClosed());
return true;
}
@Override
public boolean remove(Connection connection)
{
boolean found = false;
synchronized (this)
{
for (Entry entry : entries)
{
if (entry.connection == connection)
{
found = true;
entry.reset();
break;
}
}
}
if (found)
{
released(connection);
removed(connection);
}
return found;
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
List<Entry> connections;
synchronized (this)
{
connections = new ArrayList<>(entries);
}
Dumpable.dumpObjects(out, indent, out, connections);
}
@Override
public String toString()
{
int present = 0;
int active = 0;
synchronized (this)
{
for (Entry entry : entries)
{
if (entry.connection != null)
{
++present;
if (entry.active > 0)
++active;
}
}
}
return String.format("%s@%x[c=%d/%d/%d,a=%d]",
getClass().getSimpleName(),
hashCode(),
getPendingConnectionCount(),
present,
getMaxConnectionCount(),
active
);
}
private static class Entry
{
private Connection connection;
private int active;
private long used;
private void reset()
{
connection = null;
active = 0;
used = 0;
}
@Override
public String toString()
{
return String.format("{u=%d,c=%s}", used, connection);
Connection connection = entry.getPooled();
acquired(connection);
return connection;
}
return null;
}
}

View File

@ -19,15 +19,15 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.component.DumpableCollection;
import org.eclipse.jetty.util.thread.Scheduler;
import org.slf4j.Logger;
@ -66,10 +66,10 @@ public class ValidatingConnectionPool extends DuplexConnectionPool
public ValidatingConnectionPool(HttpDestination destination, int maxConnections, Callback requester, Scheduler scheduler, long timeout)
{
super(destination, maxConnections, requester);
super((HttpDestination)destination, maxConnections, requester);
this.scheduler = scheduler;
this.timeout = timeout;
this.quarantine = new HashMap<>(maxConnections);
this.quarantine = new ConcurrentHashMap<>(maxConnections);
}
@ManagedAttribute(value = "The number of validating connections", readonly = true)
@ -81,21 +81,11 @@ public class ValidatingConnectionPool extends DuplexConnectionPool
@Override
public boolean release(Connection connection)
{
lock();
try
{
if (!getActiveConnections().remove(connection))
return false;
Holder holder = new Holder(connection);
holder.task = scheduler.schedule(holder, timeout, TimeUnit.MILLISECONDS);
quarantine.put(connection, holder);
if (LOG.isDebugEnabled())
LOG.debug("Validating for {}ms {}", timeout, connection);
}
finally
{
unlock();
}
Holder holder = new Holder(connection);
holder.task = scheduler.schedule(holder, timeout, TimeUnit.MILLISECONDS);
quarantine.put(connection, holder);
if (LOG.isDebugEnabled())
LOG.debug("Validating for {}ms {}", timeout, connection);
released(connection);
return true;
@ -104,16 +94,7 @@ public class ValidatingConnectionPool extends DuplexConnectionPool
@Override
public boolean remove(Connection connection)
{
Holder holder;
lock();
try
{
holder = quarantine.remove(connection);
}
finally
{
unlock();
}
Holder holder = quarantine.remove(connection);
if (holder == null)
return super.remove(connection);
@ -129,25 +110,16 @@ public class ValidatingConnectionPool extends DuplexConnectionPool
}
@Override
protected void dump(Appendable out, String indent, Object... items) throws IOException
public void dump(Appendable out, String indent) throws IOException
{
DumpableCollection toDump = new DumpableCollection("quarantine", quarantine.values());
super.dump(out, indent, Stream.concat(Stream.of(items), Stream.of(toDump)));
Dumpable.dumpObjects(out, indent, this, toDump);
}
@Override
public String toString()
{
int size;
lock();
try
{
size = quarantine.size();
}
finally
{
unlock();
}
int size = quarantine.size();
return String.format("%s[v=%d]", super.toString(), size);
}
@ -169,20 +141,11 @@ public class ValidatingConnectionPool extends DuplexConnectionPool
if (done.compareAndSet(false, true))
{
boolean closed = isClosed();
lock();
try
{
if (LOG.isDebugEnabled())
LOG.debug("Validated {}", connection);
quarantine.remove(connection);
if (!closed)
deactivate(connection);
}
finally
{
unlock();
}
if (LOG.isDebugEnabled())
LOG.debug("Validated {}", connection);
quarantine.remove(connection);
if (!closed)
deactivate(connection);
idle(connection, closed);
proceed();
}

View File

@ -44,12 +44,13 @@ import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.thread.Sweeper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HttpConnectionOverHTTP extends AbstractConnection implements IConnection, org.eclipse.jetty.io.Connection.UpgradeFrom, Sweeper.Sweepable
public class HttpConnectionOverHTTP extends AbstractConnection implements IConnection, org.eclipse.jetty.io.Connection.UpgradeFrom, Sweeper.Sweepable, Attachable
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverHTTP.class);
@ -161,6 +162,18 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements IConne
return closed.get();
}
@Override
public void setAttachment(Object obj)
{
delegate.setAttachment(obj);
}
@Override
public Object getAttachment()
{
return delegate.getAttachment();
}
@Override
public boolean onIdleExpired()
{

View File

@ -0,0 +1,35 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
public class ConnectionPoolHelper
{
public static Connection acquire(AbstractConnectionPool connectionPool, boolean create)
{
return connectionPool.acquire(create);
}
public static void tryCreate(AbstractConnectionPool connectionPool, int pending)
{
connectionPool.tryCreate(pending);
}
}

View File

@ -16,7 +16,7 @@
// ========================================================================
//
package org.eclipse.jetty.client.http;
package org.eclipse.jetty.client;
import java.lang.reflect.Method;
import java.util.concurrent.CountDownLatch;
@ -24,21 +24,12 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.eclipse.jetty.client.AbstractHttpClientServerTest;
import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.DuplexHttpDestination;
import org.eclipse.jetty.client.EmptyServerHandler;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.util.Callback;
import org.hamcrest.Matchers;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
@ -52,7 +43,7 @@ import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
public class DuplexHttpDestinationTest extends AbstractHttpClientServerTest
{
@ParameterizedTest
@ArgumentsSource(ScenarioProvider.class)
@ -67,7 +58,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
Connection connection = connectionPool.acquire(true);
assertNull(connection);
// There are no queued requests, so no connection should be created.
connection = pollIdleConnection(connectionPool, 1, TimeUnit.SECONDS);
connection = peekIdleConnection(connectionPool, 1, TimeUnit.SECONDS);
assertNull(connection);
}
}
@ -78,19 +69,19 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
{
start(scenario, new EmptyServerHandler());
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
try (HttpDestination destination = new DuplexHttpDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
{
destination.start();
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
// Trigger creation of one connection.
connectionPool.tryCreate(1);
ConnectionPoolHelper.tryCreate(connectionPool, 1);
Connection connection = connectionPool.acquire(false);
Connection connection = ConnectionPoolHelper.acquire(connectionPool, false);
if (connection == null)
{
// There are no queued requests, so the newly created connection will be idle
connection = pollIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
connection = peekIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
}
assertNotNull(connection);
}
@ -102,13 +93,13 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
{
start(scenario, new EmptyServerHandler());
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
try (HttpDestination destination = new DuplexHttpDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
{
destination.start();
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
// Trigger creation of one connection.
connectionPool.tryCreate(1);
ConnectionPoolHelper.tryCreate(connectionPool, 1);
Connection connection1 = connectionPool.acquire(true);
if (connection1 == null)
@ -131,12 +122,12 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
CountDownLatch idleLatch = new CountDownLatch(1);
CountDownLatch latch = new CountDownLatch(1);
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort()))
try (HttpDestination destination = new DuplexHttpDestination(client, new Origin("http", "localhost", connector.getLocalPort()))
{
@Override
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new TestConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
return new DuplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
{
@Override
protected void onCreated(Connection connection)
@ -157,10 +148,10 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
})
{
destination.start();
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
// Trigger creation of one connection.
connectionPool.tryCreate(1);
ConnectionPoolHelper.tryCreate(connectionPool, 1);
// Make sure we entered idleCreated().
assertTrue(idleLatch.await(5, TimeUnit.SECONDS));
@ -171,7 +162,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
assertNull(connection1);
// Trigger creation of a second connection.
connectionPool.tryCreate(1);
ConnectionPoolHelper.tryCreate(connectionPool, 1);
// Second attempt also returns null because we delayed idleCreated() above.
Connection connection2 = connectionPool.acquire(true);
@ -180,9 +171,9 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
latch.countDown();
// There must be 2 idle connections.
Connection connection = pollIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
Connection connection = peekIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
assertNotNull(connection);
connection = pollIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
connection = peekIdleConnection(connectionPool, 5, TimeUnit.SECONDS);
assertNotNull(connection);
}
}
@ -193,13 +184,13 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
{
start(scenario, new EmptyServerHandler());
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
try (HttpDestination destination = new DuplexHttpDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
{
destination.start();
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
// Trigger creation of one connection.
connectionPool.tryCreate(1);
ConnectionPoolHelper.tryCreate(connectionPool, 1);
Connection connection1 = connectionPool.acquire(true);
if (connection1 == null)
@ -230,13 +221,13 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
long idleTimeout = 1000;
startClient(scenario, httpClient -> httpClient.setIdleTimeout(idleTimeout));
try (TestDestination destination = new TestDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
try (HttpDestination destination = new DuplexHttpDestination(client, new Origin("http", "localhost", connector.getLocalPort())))
{
destination.start();
TestDestination.TestConnectionPool connectionPool = (TestDestination.TestConnectionPool)destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
// Trigger creation of one connection.
connectionPool.tryCreate(1);
ConnectionPoolHelper.tryCreate(connectionPool, 1);
Connection connection1 = connectionPool.acquire(true);
if (connection1 == null)
@ -247,7 +238,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
connection1 = connectionPool.getIdleConnections().poll();
connection1 = connectionPool.getIdleConnections().peek();
assertNull(connection1);
}
}
@ -353,11 +344,6 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
assertTrue(client.getDestinations().isEmpty(), "Destination must be removed after connection error");
}
private Connection pollIdleConnection(DuplexConnectionPool connectionPool, long time, TimeUnit unit) throws InterruptedException
{
return await(() -> connectionPool.getIdleConnections().poll(), time, unit);
}
private Connection peekIdleConnection(DuplexConnectionPool connectionPool, long time, TimeUnit unit) throws InterruptedException
{
return await(() -> connectionPool.getIdleConnections().peek(), time, unit);
@ -375,38 +361,4 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
}
return null;
}
private static class TestDestination extends DuplexHttpDestination
{
public TestDestination(HttpClient client, Origin origin)
{
super(client, origin);
}
@Override
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new TestConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
}
public static class TestConnectionPool extends DuplexConnectionPool
{
public TestConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
super(destination, maxConnections, requester);
}
@Override
public void tryCreate(int maxPending)
{
super.tryCreate(maxPending);
}
@Override
public Connection acquire(boolean create)
{
return super.acquire(create);
}
}
}
}

View File

@ -661,7 +661,7 @@ public class HttpClientTLSTest
HttpDestination destination = client.resolveDestination(origin);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
// Trigger the creation of a new connection, but don't use it.
connectionPool.tryCreate(-1);
ConnectionPoolHelper.tryCreate(connectionPool, -1);
// Verify that the connection has been created.
while (true)
{
@ -759,7 +759,7 @@ public class HttpClientTLSTest
HttpDestination destination = client.resolveDestination(origin);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
// Trigger the creation of a new connection, but don't use it.
connectionPool.tryCreate(-1);
ConnectionPoolHelper.tryCreate(connectionPool, -1);
// Verify that the connection has been created.
while (true)
{

View File

@ -74,17 +74,14 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
HttpDestination destination = (HttpDestination)client.resolveDestination(request);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Collection<Connection> idleConnections = connectionPool.getIdleConnections();
assertEquals(0, idleConnections.size());
Collection<Connection> activeConnections = connectionPool.getActiveConnections();
assertEquals(0, activeConnections.size());
assertEquals(0, connectionPool.getIdleConnections().size());
assertEquals(0, connectionPool.getActiveConnections().size());
request.onRequestSuccess(r -> successLatch.countDown())
.onResponseHeaders(response ->
{
assertEquals(0, idleConnections.size());
assertEquals(1, activeConnections.size());
assertEquals(0, ((DuplexConnectionPool)destination.getConnectionPool()).getIdleConnections().size());
assertEquals(1, ((DuplexConnectionPool)destination.getConnectionPool()).getActiveConnections().size());
headersLatch.countDown();
})
.send(new Response.Listener.Adapter()
@ -106,8 +103,8 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
assertTrue(headersLatch.await(30, TimeUnit.SECONDS));
assertTrue(successLatch.await(30, TimeUnit.SECONDS));
assertEquals(1, idleConnections.size());
assertEquals(0, activeConnections.size());
assertEquals(1, ((DuplexConnectionPool)destination.getConnectionPool()).getIdleConnections().size());
assertEquals(0, ((DuplexConnectionPool)destination.getConnectionPool()).getActiveConnections().size());
}
@ParameterizedTest
@ -124,18 +121,15 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
HttpDestination destination = (HttpDestination)client.resolveDestination(request);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Collection<Connection> idleConnections = connectionPool.getIdleConnections();
assertEquals(0, idleConnections.size());
Collection<Connection> activeConnections = connectionPool.getActiveConnections();
assertEquals(0, activeConnections.size());
assertEquals(0, connectionPool.getIdleConnections().size());
assertEquals(0, connectionPool.getActiveConnections().size());
request.listener(new Request.Listener.Adapter()
{
@Override
public void onBegin(Request request)
{
activeConnections.iterator().next().close();
connectionPool.getActiveConnections().iterator().next().close();
beginLatch.countDown();
}
@ -144,24 +138,23 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
{
failureLatch.countDown();
}
})
.send(new Response.Listener.Adapter()
}).send(new Response.Listener.Adapter()
{
@Override
public void onComplete(Result result)
{
@Override
public void onComplete(Result result)
{
assertTrue(result.isFailed());
assertEquals(0, idleConnections.size());
assertEquals(0, activeConnections.size());
failureLatch.countDown();
}
});
assertTrue(result.isFailed());
assertEquals(0, connectionPool.getIdleConnections().size());
assertEquals(0, connectionPool.getActiveConnections().size());
failureLatch.countDown();
}
});
assertTrue(beginLatch.await(30, TimeUnit.SECONDS));
assertTrue(failureLatch.await(30, TimeUnit.SECONDS));
assertEquals(0, idleConnections.size());
assertEquals(0, activeConnections.size());
assertEquals(0, connectionPool.getIdleConnections().size());
assertEquals(0, connectionPool.getActiveConnections().size());
}
@ParameterizedTest

View File

@ -51,13 +51,14 @@ import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.RetainableByteBuffer;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class HttpConnectionOverFCGI extends AbstractConnection implements IConnection
public class HttpConnectionOverFCGI extends AbstractConnection implements IConnection, Attachable
{
private static final Logger LOG = LoggerFactory.getLogger(HttpConnectionOverFCGI.class);
@ -71,6 +72,7 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
private final Delegate delegate;
private final ClientParser parser;
private RetainableByteBuffer networkBuffer;
private Object attachment;
public HttpConnectionOverFCGI(EndPoint endPoint, HttpDestination destination, Promise<Connection> promise)
{
@ -265,6 +267,18 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements IConne
return closed.get();
}
@Override
public void setAttachment(Object obj)
{
this.attachment = obj;
}
@Override
public Object getAttachment()
{
return attachment;
}
protected boolean closeByHTTP(HttpFields fields)
{
if (!fields.contains(HttpHeader.CONNECTION, HttpHeaderValue.CLOSE.asString()))

View File

@ -22,6 +22,7 @@ import java.io.Closeable;
import org.eclipse.jetty.http2.api.Stream;
import org.eclipse.jetty.http2.frames.Frame;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Callback;
/**
@ -29,21 +30,8 @@ import org.eclipse.jetty.util.Callback;
* <p>This class extends {@link Stream} by adding the methods required to
* implement the HTTP/2 stream functionalities.</p>
*/
public interface IStream extends Stream, Closeable
public interface IStream extends Stream, Attachable, Closeable
{
/**
* @return the object attached to this stream
* @see #setAttachment(Object)
*/
public Object getAttachment();
/**
* Attaches the given object to this stream for later retrieval.
*
* @param attachment the object to attach to this stream
*/
public void setAttachment(Object attachment);
/**
* @return whether this stream is local or remote
*/

View File

@ -1707,7 +1707,7 @@ public class Request implements HttpServletRequest
// TODO this is not really right for CONNECT
path = _uri.isAbsolute() ? "/" : null;
else if (encoded.startsWith("/"))
path = (encoded.length() == 1) ? "/" : _uri.getDecodedPath();
path = (encoded.length() == 1) ? "/" : URIUtil.canonicalPath(_uri.getDecodedPath());
else if ("*".equals(encoded) || HttpMethod.CONNECT.is(getMethod()))
path = encoded;
else

View File

@ -41,6 +41,7 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.DispatcherType;
import javax.servlet.Filter;
import javax.servlet.FilterRegistration;
@ -223,11 +224,14 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
public enum Availability
{
UNAVAILABLE, STARTING, AVAILABLE, SHUTDOWN,
STOPPED, // stopped and can't be made unavailable nor shutdown
STARTING, // starting inside of doStart. It may go to any of the next states.
AVAILABLE, // running normally
UNAVAILABLE, // Either a startup error or explicit call to setAvailable(false)
SHUTDOWN, // graceful shutdown
}
;
private volatile Availability _availability = Availability.UNAVAILABLE;
private final AtomicReference<Availability> _availability = new AtomicReference<>(Availability.STOPPED);
public ContextHandler()
{
@ -725,7 +729,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
@ManagedAttribute("true for graceful shutdown, which allows existing requests to complete")
public boolean isShutdown()
{
return _availability == Availability.SHUTDOWN;
return _availability.get() == Availability.SHUTDOWN;
}
/**
@ -735,10 +739,25 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
@Override
public CompletableFuture<Void> shutdown()
{
_availability = isRunning() ? Availability.SHUTDOWN : Availability.UNAVAILABLE;
CompletableFuture<Void> shutdown = new CompletableFuture<Void>();
shutdown.complete(null);
return shutdown;
while (true)
{
Availability availability = _availability.get();
switch (availability)
{
case STOPPED:
return CompletableFuture.failedFuture(new IllegalStateException(getState()));
case STARTING:
case AVAILABLE:
case UNAVAILABLE:
if (!_availability.compareAndSet(availability, Availability.SHUTDOWN))
continue;
break;
default:
break;
}
break;
}
return CompletableFuture.completedFuture(null);
}
/**
@ -746,7 +765,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
*/
public boolean isAvailable()
{
return _availability == Availability.AVAILABLE;
return _availability.get() == Availability.AVAILABLE;
}
/**
@ -756,12 +775,46 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
*/
public void setAvailable(boolean available)
{
synchronized (this)
// Only supported state transitions are:
// UNAVAILABLE --true---> AVAILABLE
// STARTING -----false--> UNAVAILABLE
// AVAILABLE ----false--> UNAVAILABLE
if (available)
{
if (available && isRunning())
_availability = Availability.AVAILABLE;
else if (!available || !isRunning())
_availability = Availability.UNAVAILABLE;
while (true)
{
Availability availability = _availability.get();
switch (availability)
{
case AVAILABLE:
break;
case UNAVAILABLE:
if (!_availability.compareAndSet(availability, Availability.AVAILABLE))
continue;
break;
default:
throw new IllegalStateException(availability.toString());
}
break;
}
}
else
{
while (true)
{
Availability availability = _availability.get();
switch (availability)
{
case STARTING:
case AVAILABLE:
if (!_availability.compareAndSet(availability, Availability.UNAVAILABLE))
continue;
break;
default:
break;
}
break;
}
}
}
@ -778,7 +831,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
@Override
protected void doStart() throws Exception
{
_availability = Availability.STARTING;
_availability.set(Availability.STARTING);
if (_contextPath == null)
throw new IllegalStateException("Null contextPath");
@ -815,13 +868,12 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
contextInitialized();
_availability = Availability.AVAILABLE;
_availability.compareAndSet(Availability.STARTING, Availability.AVAILABLE);
LOG.info("Started {}", this);
}
finally
{
if (_availability == Availability.STARTING)
_availability = Availability.UNAVAILABLE;
_availability.compareAndSet(Availability.STARTING, Availability.UNAVAILABLE);
exitScope(null);
__context.set(oldContext);
// reset the classloader
@ -969,7 +1021,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
{
if (getServer().isDryRun())
return;
if (LOG.isDebugEnabled())
LOG.debug("contextInitialized: {}->{}", e, l);
l.contextInitialized(e);
@ -979,7 +1031,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
{
if (getServer().isDryRun())
return;
if (LOG.isDebugEnabled())
LOG.debug("contextDestroyed: {}->{}", e, l);
l.contextDestroyed(e);
@ -991,7 +1043,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
// Should we attempt a graceful shutdown?
MultiException mex = null;
_availability = Availability.UNAVAILABLE;
_availability.set(Availability.STOPPED);
ClassLoader oldClassloader = null;
ClassLoader oldWebapploader = null;
@ -1146,8 +1198,10 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
return false;
}
switch (_availability)
switch (_availability.get())
{
case STOPPED:
return false;
case SHUTDOWN:
case UNAVAILABLE:
baseRequest.setHandled(true);
@ -1502,6 +1556,8 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
*/
public void setClassLoader(ClassLoader classLoader)
{
if (isStarted())
throw new IllegalStateException(getState());
_classLoader = classLoader;
}
@ -1774,7 +1830,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
b.append('{');
if (getDisplayName() != null)
b.append(getDisplayName()).append(',');
b.append(getContextPath()).append(',').append(getBaseResource()).append(',').append(_availability);
b.append(getContextPath()).append(',').append(getBaseResource()).append(',').append(_availability.get());
if (vhosts != null && vhosts.length > 0)
b.append(',').append(vhosts[0]);
@ -1783,7 +1839,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
return b.toString();
}
public synchronized Class<?> loadClass(String className) throws ClassNotFoundException
public Class<?> loadClass(String className) throws ClassNotFoundException
{
if (className == null)
return null;
@ -2260,7 +2316,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
}
@Override
public synchronized Object getAttribute(String name)
public Object getAttribute(String name)
{
Object o = ContextHandler.this.getAttribute(name);
if (o == null)
@ -2269,7 +2325,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
}
@Override
public synchronized Enumeration<String> getAttributeNames()
public Enumeration<String> getAttributeNames()
{
HashSet<String> set = new HashSet<>();
Enumeration<String> e = super.getAttributeNames();
@ -2287,7 +2343,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
}
@Override
public synchronized void setAttribute(String name, Object value)
public void setAttribute(String name, Object value)
{
Object oldValue = super.getAttribute(name);
@ -2313,7 +2369,7 @@ public class ContextHandler extends ScopedHandler implements Attributes, Gracefu
}
@Override
public synchronized void removeAttribute(String name)
public void removeAttribute(String name)
{
Object oldValue = super.getAttribute(name);
super.removeAttribute(name);

View File

@ -417,6 +417,18 @@ public class ContextHandlerTest
assertThat(connector.getResponse("GET /foo/xxx HTTP/1.0\n\n"), Matchers.containsString("ctx='/foo'"));
assertThat(connector.getResponse("GET /foo/bar/xxx HTTP/1.0\n\n"), Matchers.containsString("ctx='/foo/bar'"));
// If we make foobar unavailable, then requests will be handled by 503
foobar.setAvailable(false);
assertThat(connector.getResponse("GET / HTTP/1.0\n\n"), Matchers.containsString("ctx=''"));
assertThat(connector.getResponse("GET /foo/xxx HTTP/1.0\n\n"), Matchers.containsString("ctx='/foo'"));
assertThat(connector.getResponse("GET /foo/bar/xxx HTTP/1.0\n\n"), Matchers.containsString(" 503 "));
// If we make foobar available, then requests will be handled normally
foobar.setAvailable(true);
assertThat(connector.getResponse("GET / HTTP/1.0\n\n"), Matchers.containsString("ctx=''"));
assertThat(connector.getResponse("GET /foo/xxx HTTP/1.0\n\n"), Matchers.containsString("ctx='/foo'"));
assertThat(connector.getResponse("GET /foo/bar/xxx HTTP/1.0\n\n"), Matchers.containsString("ctx='/foo/bar'"));
// If we stop foobar, then requests will be handled by foo
foobar.stop();
assertThat(connector.getResponse("GET / HTTP/1.0\n\n"), Matchers.containsString("ctx=''"));

View File

@ -150,12 +150,28 @@ public class DefaultServletRangesTest
String boundary = body.substring(0, body.indexOf("\r\n"));
assertResponseContains("206 Partial", response);
assertResponseContains("Content-Type: multipart/byteranges; boundary=", response);
assertResponseContains("Content-Range: bytes 0-9/80", response);
assertResponseContains("Content-Range: bytes 20-29/80", response);
assertResponseContains("Content-Range: bytes 40-49/80", response);
assertResponseContains(DATA.substring(0, 10), response);
assertResponseContains(DATA.substring(20, 30), response);
assertResponseContains(DATA.substring(40, 50), response);
String section1 = boundary + "\r\n" +
"Content-Type: text/plain\r\n" +
"Content-Range: bytes 0-9/80\r\n" +
"\r\n" +
DATA.substring(0, 10) + "\r\n";
assertResponseContains(section1, response);
String section2 = boundary + "\r\n" +
"Content-Type: text/plain\r\n" +
"Content-Range: bytes 20-29/80\r\n" +
"\r\n" +
DATA.substring(20, 30) + "\r\n";
assertResponseContains(section2, response);
String section3 = boundary + "\r\n" +
"Content-Type: text/plain\r\n" +
"Content-Range: bytes 40-49/80\r\n" +
"\r\n" +
DATA.substring(40, 50) + "\r\n";
assertResponseContains(section3, response);
assertTrue(body.endsWith(boundary + "--\r\n"));
}

View File

@ -0,0 +1,38 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
/**
* Abstract mechanism to support attachment of miscellaneous objects.
*/
public interface Attachable
{
/**
* @return the object attached to this stream
* @see #setAttachment(Object)
*/
Object getAttachment();
/**
* Attaches the given object to this stream for later retrieval.
*
* @param attachment the object to attach to this stream
*/
void setAttachment(Object attachment);
}

View File

@ -29,8 +29,8 @@ import java.nio.charset.StandardCharsets;
public class MultiPartOutputStream extends FilterOutputStream
{
private static final byte[] __CRLF = {'\r', '\n'};
private static final byte[] __DASHDASH = {'-', '-'};
private static final byte[] CRLF = {'\r', '\n'};
private static final byte[] DASHDASH = {'-', '-'};
public static final String MULTIPART_MIXED = "multipart/mixed";
public static final String MULTIPART_X_MIXED_REPLACE = "multipart/x-mixed-replace";
@ -71,11 +71,11 @@ public class MultiPartOutputStream extends FilterOutputStream
try
{
if (inPart)
out.write(__CRLF);
out.write(__DASHDASH);
out.write(CRLF);
out.write(DASHDASH);
out.write(boundaryBytes);
out.write(__DASHDASH);
out.write(__CRLF);
out.write(DASHDASH);
out.write(CRLF);
inPart = false;
}
finally
@ -104,15 +104,19 @@ public class MultiPartOutputStream extends FilterOutputStream
throws IOException
{
if (inPart)
out.write(__CRLF);
{
out.write(CRLF);
}
inPart = true;
out.write(__DASHDASH);
out.write(DASHDASH);
out.write(boundaryBytes);
out.write(__CRLF);
out.write(CRLF);
if (contentType != null)
{
out.write(("Content-Type: " + contentType).getBytes(StandardCharsets.ISO_8859_1));
out.write(__CRLF);
out.write(__CRLF);
out.write(CRLF);
}
out.write(CRLF);
}
/**
@ -126,20 +130,22 @@ public class MultiPartOutputStream extends FilterOutputStream
throws IOException
{
if (inPart)
out.write(__CRLF);
out.write(CRLF);
inPart = true;
out.write(__DASHDASH);
out.write(DASHDASH);
out.write(boundaryBytes);
out.write(__CRLF);
out.write(CRLF);
if (contentType != null)
{
out.write(("Content-Type: " + contentType).getBytes(StandardCharsets.ISO_8859_1));
out.write(__CRLF);
out.write(CRLF);
}
for (int i = 0; headers != null && i < headers.length; i++)
{
out.write(headers[i].getBytes(StandardCharsets.ISO_8859_1));
out.write(__CRLF);
out.write(CRLF);
}
out.write(__CRLF);
out.write(CRLF);
}
@Override
@ -148,7 +154,3 @@ public class MultiPartOutputStream extends FilterOutputStream
out.write(b, off, len);
}
}

View File

@ -0,0 +1,457 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* A fast container of poolable objects, with optional support for
* multiplexing, max usage count and thread-local caching.
* <p>
* The thread-local caching mechanism is about remembering up to N previously
* used entries into a thread-local single-threaded collection.
* When that collection is not empty, its entries are removed one by one
* during acquisition until an entry that can be acquired is found.
* This can greatly speed up acquisition when both the acquisition and the
* release of the entries is done on the same thread as this avoids iterating
* the global, thread-safe collection of entries.
* @param <T>
*/
public class Pool<T> implements AutoCloseable, Dumpable
{
private static final Logger LOGGER = Log.getLogger(Pool.class);
private final List<Entry> sharedList = new CopyOnWriteArrayList<>();
/*
* The cache is used to avoid hammering on the first index of the entry list.
* Caches can become poisoned (i.e.: containing entries that are in use) when
* the release isn't done by the acquiring thread or when the entry pool is
* undersized compared to the load applied on it.
* When an entry can't be found in the cache, the global list is iterated
* normally so the cache has no visible effect besides performance.
*/
private final ThreadLocal<List<Entry>> cache;
private final Lock lock = new ReentrantLock();
private final int maxEntries;
private final int cacheSize;
private volatile boolean closed;
private volatile int maxMultiplex = 1;
private volatile int maxUsageCount = -1;
/**
* Construct a Pool with the specified thread-local cache size.
*
* @param maxEntries the maximum amount of entries that the pool will accept.
* @param cacheSize the thread-local cache size. A value less than 1 means the cache is disabled.
*/
public Pool(int maxEntries, int cacheSize)
{
this.maxEntries = maxEntries;
this.cacheSize = cacheSize;
if (cacheSize > 0)
this.cache = ThreadLocal.withInitial(() -> new ArrayList<Entry>(cacheSize));
else
this.cache = null;
}
public int getPendingConnectionCount()
{
return (int)sharedList.stream().filter(entry -> entry.getPooled() == null).count();
}
public int getIdleConnectionCount()
{
return (int)sharedList.stream().filter(Entry::isIdle).count();
}
public int getInUseConnectionCount()
{
return (int)sharedList.stream().filter(entry -> !entry.isIdle()).count();
}
public int getMaxEntries()
{
return maxEntries;
}
public int getMaxMultiplex()
{
return maxMultiplex;
}
public final void setMaxMultiplex(int maxMultiplex)
{
if (maxMultiplex < 1)
throw new IllegalArgumentException("Max multiplex must be >= 1");
this.maxMultiplex = maxMultiplex;
}
public int getMaxUsageCount()
{
return maxUsageCount;
}
public final void setMaxUsageCount(int maxUsageCount)
{
if (maxUsageCount == 0)
throw new IllegalArgumentException("Max usage count must be != 0");
this.maxUsageCount = maxUsageCount;
}
/**
* Create a new disabled slot into the pool. The returned entry
* won't be acquirable as long as {@link Entry#enable(Object)}
* has not been called.
*
* @param maxReservations the max desired number of reserved entries,
* or a negative number to always trigger the reservation of a new entry.
* @return a disabled entry that is contained in the pool,
* or null if the pool is closed or if the pool already contains
* {@link #getMaxEntries()} entries.
*/
public Entry reserve(int maxReservations)
{
if (maxReservations >= 0 && getPendingConnectionCount() >= maxReservations)
return null;
lock.lock();
try
{
if (!closed && sharedList.size() < maxEntries)
{
Entry entry = new Entry();
sharedList.add(entry);
return entry;
}
return null;
}
finally
{
lock.unlock();
}
}
/**
* Acquire the entry from the pool at the specified index. This method bypasses the thread-local mechanism.
*
* @param idx the index of the entry to acquire.
* @return the specified entry or null if there is none at the specified index or if it is not available.
*/
public Entry acquireAt(int idx)
{
if (closed)
return null;
try
{
Entry entry = sharedList.get(idx);
if (entry.tryAcquire())
return entry;
}
catch (IndexOutOfBoundsException e)
{
// no entry at that index
}
return null;
}
/**
* Acquire an entry from the pool.
*
* @return an entry from the pool or null if none is available.
*/
public Entry acquire()
{
if (closed)
return null;
// first check the thread-local cache
if (cache != null)
{
List<Entry> cachedList = cache.get();
while (!cachedList.isEmpty())
{
Entry cachedEntry = cachedList.remove(cachedList.size() - 1);
if (cachedEntry.tryAcquire())
return cachedEntry;
}
}
// then iterate the shared list
for (Entry entry : sharedList)
{
if (entry.tryAcquire())
return entry;
}
return null;
}
/**
* This method will return an acquired object to the pool. Objects
* that are acquired from the pool but never released will result
* in a memory leak.
*
* @param entry the value to return to the pool
* @return true if the entry was released and could be acquired again,
* false if the entry should be removed by calling {@link #remove(Pool.Entry)}
* and the object contained by the entry should be disposed.
* @throws NullPointerException if value is null
*/
public boolean release(Entry entry)
{
if (closed)
return false;
// first mark it as unused
boolean reusable = entry.tryRelease();
// then cache the released entry
if (cache != null && reusable)
{
List<Entry> cachedList = cache.get();
if (cachedList.size() < cacheSize)
cachedList.add(entry);
}
return reusable;
}
/**
* Remove a value from the pool.
*
* @param entry the value to remove
* @return true if the entry was removed, false otherwise
*/
public boolean remove(Entry entry)
{
if (closed)
return false;
if (!entry.tryRemove())
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("Attempt to remove an object from the pool that is still in use: {}", entry);
return false;
}
boolean removed = sharedList.remove(entry);
if (!removed)
{
if (LOGGER.isDebugEnabled())
LOGGER.debug("Attempt to remove an object from the pool that does not exist: {}", entry);
}
return removed;
}
public boolean isClosed()
{
return closed;
}
@Override
public void close()
{
List<Entry> copy;
lock.lock();
try
{
closed = true;
copy = new ArrayList<>(sharedList);
sharedList.clear();
}
finally
{
lock.unlock();
}
// iterate the copy and close its entries
for (Entry entry : copy)
{
if (entry.tryRemove() && entry.pooled instanceof Closeable)
{
try
{
((Closeable)entry.pooled).close();
}
catch (IOException e)
{
LOGGER.warn("Error closing entry {}", entry, e);
}
}
}
}
public int size()
{
return sharedList.size();
}
public Collection<Entry> values()
{
return Collections.unmodifiableCollection(sharedList);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
Dumpable.dumpObjects(out, indent, this);
}
@Override
public String toString()
{
return getClass().getSimpleName() + " size=" + sharedList.size() + " closed=" + closed + " entries=" + sharedList;
}
public class Entry
{
// hi: positive=open/maxUsage counter,negative=closed lo: multiplexing counter
private final AtomicBiInteger state;
private volatile T pooled;
public Entry()
{
this.state = new AtomicBiInteger(-1, 0);
}
public T getPooled()
{
return pooled;
}
public void enable(T pooled)
{
if (!isClosed())
throw new IllegalStateException("Open entries cannot be enabled : " + this);
Objects.requireNonNull(pooled);
this.pooled = pooled;
state.set(0, 0);
}
/**
* Try to acquire the entry if possible by incrementing both the usage
* count and the multiplex count.
* @return true if the usage count is &lt;= maxUsageCount and
* the multiplex count is maxMultiplex and the entry is not closed,
* false otherwise.
*/
public boolean tryAcquire()
{
while (true)
{
long encoded = state.get();
int usageCount = AtomicBiInteger.getHi(encoded);
boolean closed = usageCount < 0;
int multiplexingCount = AtomicBiInteger.getLo(encoded);
int currentMaxUsageCount = maxUsageCount;
if (closed || multiplexingCount >= maxMultiplex || (currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount))
return false;
if (state.compareAndSet(encoded, usageCount + 1, multiplexingCount + 1))
return true;
}
}
/**
* Try to release the entry if possible by decrementing the multiplexing
* count unless the entity is closed.
* @return true if the entry was released,
* false if {@link #tryRemove()} should be called.
*/
public boolean tryRelease()
{
int newMultiplexingCount;
int usageCount;
while (true)
{
long encoded = state.get();
usageCount = AtomicBiInteger.getHi(encoded);
boolean closed = usageCount < 0;
if (closed)
return false;
newMultiplexingCount = AtomicBiInteger.getLo(encoded) - 1;
if (newMultiplexingCount < 0)
throw new IllegalStateException("Cannot release an already released entry");
if (state.compareAndSet(encoded, usageCount, newMultiplexingCount))
break;
}
int currentMaxUsageCount = maxUsageCount;
boolean overUsed = currentMaxUsageCount > 0 && usageCount >= currentMaxUsageCount;
return !(overUsed && newMultiplexingCount == 0);
}
/**
* Try to mark the entry as removed.
* @return true if the entry has to be removed from the containing pool, false otherwise.
*/
public boolean tryRemove()
{
while (true)
{
long encoded = state.get();
int usageCount = AtomicBiInteger.getHi(encoded);
int multiplexCount = AtomicBiInteger.getLo(encoded);
int newMultiplexCount = Math.max(multiplexCount - 1, 0);
boolean removed = state.compareAndSet(usageCount, -1, multiplexCount, newMultiplexCount);
if (removed)
return newMultiplexCount == 0;
}
}
public boolean isClosed()
{
return state.getHi() < 0;
}
public boolean isIdle()
{
return state.getLo() <= 0;
}
public int getUsageCount()
{
return Math.max(state.getHi(), 0);
}
@Override
public String toString()
{
long encoded = state.get();
return super.toString() + " stateHi=" + AtomicBiInteger.getHi(encoded) +
" stateLo=" + AtomicBiInteger.getLo(encoded) + " pooled=" + pooled;
}
}
}

View File

@ -0,0 +1,442 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.util;
import java.io.Closeable;
import java.util.Arrays;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.jupiter.api.Test;
import static java.util.stream.Collectors.toList;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.hamcrest.Matchers.sameInstance;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class PoolTest
{
@Test
public void testAcquireRelease()
{
Pool<String> pool = new Pool<>(1,0);
pool.reserve(-1).enable("aaa");
assertThat(pool.values().stream().findFirst().get().isIdle(), is(true));
Pool<String>.Entry e1 = pool.acquire();
assertThat(e1.getPooled(), equalTo("aaa"));
assertThat(pool.values().stream().findFirst().get().isIdle(), is(false));
assertThat(pool.acquire(), nullValue());
assertThat(pool.release(e1), is(true));
assertThat(pool.values().stream().findFirst().get().isIdle(), is(true));
assertThrows(IllegalStateException.class, () -> pool.release(e1));
Pool<String>.Entry e2 = pool.acquire();
assertThat(e2.getPooled(), equalTo("aaa"));
assertThat(pool.release(e2), is(true));
assertThrows(NullPointerException.class, () -> pool.release(null));
}
@Test
public void testRemoveBeforeRelease()
{
Pool<String> pool = new Pool<>(1,0);
pool.reserve(-1).enable("aaa");
Pool<String>.Entry e1 = pool.acquire();
assertThat(pool.remove(e1), is(true));
assertThat(pool.remove(e1), is(false));
assertThat(pool.release(e1), is(false));
}
@Test
public void testCloseBeforeRelease()
{
Pool<String> pool = new Pool<>(1,0);
pool.reserve(-1).enable("aaa");
Pool<String>.Entry e1 = pool.acquire();
assertThat(pool.size(), is(1));
pool.close();
assertThat(pool.size(), is(0));
assertThat(pool.release(e1), is(false));
}
@Test
public void testMaxPoolSize()
{
Pool<String> pool = new Pool<>(1, 0);
assertThat(pool.size(), is(0));
assertThat(pool.reserve(-1), notNullValue());
assertThat(pool.size(), is(1));
assertThat(pool.reserve(-1), nullValue());
assertThat(pool.size(), is(1));
}
@Test
public void testReserve()
{
Pool<String> pool = new Pool<>(2, 0);
Pool<String>.Entry entry = pool.reserve(-1);
assertThat(pool.size(), is(1));
assertThat(pool.acquire(), nullValue());
assertThat(entry.isClosed(), is(true));
assertThrows(NullPointerException.class, () -> entry.enable(null));
assertThat(pool.acquire(), nullValue());
assertThat(entry.isClosed(), is(true));
entry.enable("aaa");
assertThat(entry.isClosed(), is(false));
assertThat(pool.acquire().getPooled(), notNullValue());
assertThrows(IllegalStateException.class, () -> entry.enable("bbb"));
Pool<String>.Entry e2 = pool.reserve(-1);
assertThat(pool.size(), is(2));
assertThat(pool.remove(e2), is(true));
assertThat(pool.size(), is(1));
pool.reserve(-1);
assertThat(pool.size(), is(2));
pool.close();
assertThat(pool.size(), is(0));
assertThat(pool.reserve(-1), nullValue());
assertThat(entry.isClosed(), is(true));
}
@Test
public void testReserveMaxPending()
{
Pool<String> pool = new Pool<>(2, 0);
assertThat(pool.reserve(0), nullValue());
assertThat(pool.reserve(1), notNullValue());
assertThat(pool.reserve(1), nullValue());
assertThat(pool.reserve(2), notNullValue());
assertThat(pool.reserve(2), nullValue());
assertThat(pool.reserve(3), nullValue());
assertThat(pool.reserve(-1), nullValue());
}
@Test
public void testReserveNegativeMaxPending()
{
Pool<String> pool = new Pool<>(2, 0);
assertThat(pool.reserve(-1), notNullValue());
assertThat(pool.reserve(-1), notNullValue());
assertThat(pool.reserve(-1), nullValue());
}
@Test
public void testClose()
{
Pool<String> pool = new Pool<>(1, 0);
pool.reserve(-1).enable("aaa");
assertThat(pool.isClosed(), is(false));
pool.close();
pool.close();
assertThat(pool.isClosed(), is(true));
assertThat(pool.size(), is(0));
assertThat(pool.acquire(), nullValue());
assertThat(pool.reserve(-1), nullValue());
}
@Test
public void testClosingCloseable()
{
AtomicBoolean closed = new AtomicBoolean();
Pool<Closeable> pool = new Pool<>(1,0);
Closeable pooled = () -> closed.set(true);
pool.reserve(-1).enable(pooled);
assertThat(closed.get(), is(false));
pool.close();
assertThat(closed.get(), is(true));
}
@Test
public void testRemove()
{
Pool<String> pool = new Pool<>(1, 0);
pool.reserve(-1).enable("aaa");
Pool<String>.Entry e1 = pool.acquire();
assertThat(pool.remove(e1), is(true));
assertThat(pool.remove(e1), is(false));
assertThat(pool.release(e1), is(false));
assertThat(pool.acquire(), nullValue());
assertThrows(NullPointerException.class, () -> pool.remove(null));
}
@Test
public void testValuesSize()
{
Pool<String> pool = new Pool<>(2, 0);
assertThat(pool.size(), is(0));
assertThat(pool.values().isEmpty(), is(true));
pool.reserve(-1).enable("aaa");
pool.reserve(-1).enable("bbb");
assertThat(pool.values().stream().map(Pool.Entry::getPooled).collect(toList()), equalTo(Arrays.asList("aaa", "bbb")));
assertThat(pool.size(), is(2));
}
@Test
public void testValuesContainsAcquiredEntries()
{
Pool<String> pool = new Pool<>(2, 0);
pool.reserve(-1).enable("aaa");
pool.reserve(-1).enable("bbb");
assertThat(pool.acquire(), notNullValue());
assertThat(pool.acquire(), notNullValue());
assertThat(pool.acquire(), nullValue());
assertThat(pool.values().isEmpty(), is(false));
}
@Test
public void testAcquireAt()
{
Pool<String> pool = new Pool<>(2, 0);
pool.reserve(-1).enable("aaa");
pool.reserve(-1).enable("bbb");
assertThat(pool.acquireAt(2), nullValue());
assertThat(pool.acquireAt(0), notNullValue());
assertThat(pool.acquireAt(0), nullValue());
assertThat(pool.acquireAt(1), notNullValue());
assertThat(pool.acquireAt(1), nullValue());
}
@Test
public void testMaxUsageCount()
{
Pool<String> pool = new Pool<>(1, 0);
pool.setMaxUsageCount(3);
pool.reserve(-1).enable("aaa");
Pool<String>.Entry e1 = pool.acquire();
assertThat(pool.release(e1), is(true));
e1 = pool.acquire();
assertThat(pool.release(e1), is(true));
e1 = pool.acquire();
assertThat(pool.release(e1), is(false));
assertThat(pool.acquire(), nullValue());
assertThat(pool.size(), is(1));
assertThat(pool.remove(e1), is(true));
assertThat(pool.remove(e1), is(false));
assertThat(pool.size(), is(0));
Pool<String>.Entry e1Copy = e1;
assertThat(pool.release(e1Copy), is(false));
}
@Test
public void testMaxMultiplex()
{
Pool<String> pool = new Pool<>(2, 0);
pool.setMaxMultiplex(3);
pool.reserve(-1).enable("aaa");
pool.reserve(-1).enable("bbb");
Pool<String>.Entry e1 = pool.acquire();
Pool<String>.Entry e2 = pool.acquire();
Pool<String>.Entry e3 = pool.acquire();
Pool<String>.Entry e4 = pool.acquire();
assertThat(e1.getPooled(), equalTo("aaa"));
assertThat(e1, sameInstance(e2));
assertThat(e1, sameInstance(e3));
assertThat(e4.getPooled(), equalTo("bbb"));
assertThat(pool.release(e1), is(true));
Pool<String>.Entry e5 = pool.acquire();
assertThat(e2, sameInstance(e5));
Pool<String>.Entry e6 = pool.acquire();
assertThat(e4, sameInstance(e6));
}
@Test
public void testRemoveMultiplexed()
{
Pool<String> pool = new Pool<>(1, 0);
pool.setMaxMultiplex(2);
pool.reserve(-1).enable("aaa");
Pool<String>.Entry e1 = pool.acquire();
Pool<String>.Entry e2 = pool.acquire();
assertThat(pool.values().stream().findFirst().get().isIdle(), is(false));
assertThat(pool.remove(e1), is(false));
assertThat(pool.values().stream().findFirst().get().isIdle(), is(false));
assertThat(pool.values().stream().findFirst().get().isClosed(), is(true));
assertThat(pool.remove(e1), is(true));
assertThat(pool.size(), is(0));
assertThat(pool.remove(e1), is(false));
assertThat(pool.release(e1), is(false));
assertThat(pool.remove(e1), is(false));
}
@Test
public void testMultiplexRemoveThenAcquireThenReleaseRemove()
{
Pool<String> pool = new Pool<>(1, 0);
pool.setMaxMultiplex(2);
pool.reserve(-1).enable("aaa");
Pool<String>.Entry e1 = pool.acquire();
Pool<String>.Entry e2 = pool.acquire();
assertThat(pool.remove(e1), is(false));
assertThat(e1.isClosed(), is(true));
assertThat(pool.acquire(), nullValue());
assertThat(pool.release(e2), is(false));
assertThat(pool.remove(e2), is(true));
}
@Test
public void testNonMultiplexRemoveAfterAcquire()
{
Pool<String> pool = new Pool<>(1, 0);
pool.setMaxMultiplex(2);
pool.reserve(-1).enable("aaa");
Pool<String>.Entry e1 = pool.acquire();
assertThat(pool.remove(e1), is(true));
assertThat(pool.size(), is(0));
}
@Test
public void testMultiplexRemoveAfterAcquire()
{
Pool<String> pool = new Pool<>(1, 0);
pool.setMaxMultiplex(2);
pool.reserve(-1).enable("aaa");
Pool<String>.Entry e1 = pool.acquire();
Pool<String>.Entry e2 = pool.acquire();
assertThat(pool.remove(e1), is(false));
assertThat(pool.remove(e2), is(true));
assertThat(pool.size(), is(0));
assertThat(pool.release(e1), is(false));
assertThat(pool.size(), is(0));
Pool<String>.Entry e3 = pool.acquire();
assertThat(e3, nullValue());
assertThat(pool.release(e2), is(false));
assertThat(pool.size(), is(0));
}
@Test
public void testReleaseThenRemoveNonEnabledEntry()
{
Pool<String> pool = new Pool<>(1, 0);
Pool<String>.Entry e = pool.reserve(-1);
assertThat(pool.size(), is(1));
assertThat(pool.release(e), is(false));
assertThat(pool.size(), is(1));
assertThat(pool.remove(e), is(true));
assertThat(pool.size(), is(0));
}
@Test
public void testRemoveNonEnabledEntry()
{
Pool<String> pool = new Pool<>(1, 0);
Pool<String>.Entry e = pool.reserve(-1);
assertThat(pool.size(), is(1));
assertThat(pool.remove(e), is(true));
assertThat(pool.size(), is(0));
}
@Test
public void testMultiplexMaxUsageReachedAcquireThenRemove()
{
Pool<String> pool = new Pool<>(1, 0);
pool.setMaxMultiplex(2);
pool.setMaxUsageCount(3);
pool.reserve(-1).enable("aaa");
Pool<String>.Entry e0 = pool.acquire();
Pool<String>.Entry e1 = pool.acquire();
assertThat(pool.release(e1), is(true));
Pool<String>.Entry e2 = pool.acquire();
assertThat(pool.release(e2), is(true));
assertThat(pool.acquire(), nullValue());
assertThat(pool.remove(e0), is(true));
assertThat(pool.size(), is(0));
}
@Test
public void testMultiplexMaxUsageReachedAcquireThenReleaseThenRemove()
{
Pool<String> pool = new Pool<>(1, 0);
pool.setMaxMultiplex(2);
pool.setMaxUsageCount(3);
pool.reserve(-1).enable("aaa");
Pool<String>.Entry e0 = pool.acquire();
Pool<String>.Entry e1 = pool.acquire();
assertThat(pool.release(e1), is(true));
Pool<String>.Entry e2 = pool.acquire();
assertThat(pool.release(e2), is(true));
assertThat(pool.acquire(), nullValue());
assertThat(pool.release(e0), is(false));
assertThat(pool.values().stream().findFirst().get().isIdle(), is(true));
assertThat(pool.values().stream().findFirst().get().isClosed(), is(false));
assertThat(pool.size(), is(1));
assertThat(pool.remove(e0), is(true));
assertThat(pool.size(), is(0));
}
@Test
public void testUsageCountAfterReachingMaxMultiplexLimit()
{
Pool<String> pool = new Pool<>(1, 0);
pool.setMaxMultiplex(2);
pool.setMaxUsageCount(10);
pool.reserve(-1).enable("aaa");
Pool<String>.Entry e1 = pool.acquire();
assertThat(e1.getUsageCount(), is(1));
Pool<String>.Entry e2 = pool.acquire();
assertThat(e1.getUsageCount(), is(2));
assertThat(pool.acquire(), nullValue());
assertThat(e1.getUsageCount(), is(2));
}
@Test
public void testConfigLimits()
{
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(1, 0).setMaxMultiplex(0));
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(1, 0).setMaxMultiplex(-1));
assertThrows(IllegalArgumentException.class, () -> new Pool<String>(1, 0).setMaxUsageCount(0));
}
}

View File

@ -114,6 +114,13 @@ public class URIUtilCanonicalPathTest
// paths with encoded segments should remain encoded
// canonicalPath() is not responsible for decoding characters
{"%2e%2e/", "%2e%2e/"},
{"/%2e%2e/", "/%2e%2e/"},
// paths with parameters are not elided
// canonicalPath() is not responsible for decoding characters
{"/foo/.;/bar", "/foo/.;/bar"},
{"/foo/..;/bar", "/foo/..;/bar"},
{"/foo/..;/..;/bar", "/foo/..;/..;/bar"},
};
ArrayList<Arguments> ret = new ArrayList<>();

View File

@ -1314,7 +1314,6 @@ public class WebAppContext extends ServletContextHandler implements WebAppClassL
setClassLoader(null);
}
setAvailable(true);
_unavailableException = null;
}
}

View File

@ -106,6 +106,11 @@
<artifactId>jetty-slf4j-impl</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-client</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.eclipse.jetty.toolchain</groupId>
<artifactId>jetty-test-helper</artifactId>

View File

@ -0,0 +1,174 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client.jmh;
import java.net.URI;
import java.util.ArrayList;
import java.util.concurrent.ThreadLocalRandom;
import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpConversation;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.MultiplexConnectionPool;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.RoundRobinConnectionPool;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.util.Attachable;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.Param;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.Setup;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.infra.Blackhole;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
@State(Scope.Benchmark)
public class ConnectionPoolsBenchmark
{
private ConnectionPool pool;
@Param({"round-robin", "cached/multiplex", "uncached/multiplex", "cached/duplex", "uncached/duplex"})
public static String POOL_TYPE;
@Setup
public void setUp() throws Exception
{
HttpClient httpClient = new HttpClient()
{
@Override
protected void newConnection(HttpDestination destination, Promise<Connection> promise)
{
promise.succeeded(new MockConnection());
}
};
HttpDestination httpDestination = new HttpDestination(httpClient, new Origin("http", "localhost", 8080))
{
};
HttpConversation httpConversation = new HttpConversation();
HttpRequest httpRequest = new HttpRequest(httpClient, httpConversation, new URI("http://localhost:8080")) {};
HttpExchange httpExchange = new HttpExchange(httpDestination, httpRequest, new ArrayList<>());
httpDestination.getHttpExchanges().add(httpExchange);
int initialConnections = 12;
int maxConnections = 100;
switch (POOL_TYPE)
{
case "uncached/duplex":
pool = new DuplexConnectionPool(httpDestination, maxConnections, false, Callback.NOOP);
pool.preCreateConnections(initialConnections).get();
break;
case "cached/duplex":
pool = new DuplexConnectionPool(httpDestination, maxConnections, true, Callback.NOOP);
pool.preCreateConnections(initialConnections).get();
break;
case "uncached/multiplex":
pool = new MultiplexConnectionPool(httpDestination, maxConnections,false, Callback.NOOP, 12);
pool.preCreateConnections(initialConnections).get();
break;
case "cached/multiplex":
pool = new MultiplexConnectionPool(httpDestination, maxConnections,true, Callback.NOOP, 12);
pool.preCreateConnections(initialConnections).get();
break;
case "round-robin":
pool = new RoundRobinConnectionPool(httpDestination, maxConnections, Callback.NOOP);
pool.preCreateConnections(maxConnections).get();
break;
default:
throw new AssertionError("Unknown pool type: " + POOL_TYPE);
}
}
@TearDown
public void tearDown()
{
pool.close();
pool = null;
}
@Benchmark
public void testPool()
{
Connection connection = pool.acquire(true);
if (connection == null && !POOL_TYPE.equals("round-robin"))
throw new AssertionError("from thread " + Thread.currentThread().getName());
Blackhole.consumeCPU(ThreadLocalRandom.current().nextInt(10, 20));
if (connection != null)
pool.release(connection);
}
public static void main(String[] args) throws RunnerException
{
Options opt = new OptionsBuilder()
.include(ConnectionPoolsBenchmark.class.getSimpleName())
.warmupIterations(3)
.measurementIterations(3)
.forks(1)
.threads(12)
//.addProfiler(LinuxPerfProfiler.class)
.build();
new Runner(opt).run();
}
static class MockConnection implements Connection, Attachable
{
private Object attachment;
@Override
public void close()
{
}
@Override
public boolean isClosed()
{
return false;
}
@Override
public void send(Request request, Response.CompleteListener listener)
{
}
@Override
public void setAttachment(Object obj)
{
this.attachment = obj;
}
@Override
public Object getAttachment()
{
return attachment;
}
}
}