481116 - Introduce connection pooling also for HTTP/2 transport.

Implemented connection pooling for multiplexed transports.
Reworked the ConnectionPool code and its relationship with
HttpDestination.
This commit is contained in:
Simone Bordet 2015-10-30 15:33:12 +01:00
parent 0b95a9e23e
commit 0bd1e0ad7d
27 changed files with 1082 additions and 959 deletions

View File

@ -0,0 +1,199 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
{
private static final Logger LOG = Log.getLogger(AbstractConnectionPool.class);
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicInteger connectionCount = new AtomicInteger();
private final Destination destination;
private final int maxConnections;
private final Callback requester;
protected AbstractConnectionPool(Destination destination, int maxConnections, Callback requester)
{
this.destination = destination;
this.maxConnections = maxConnections;
this.requester = requester;
}
@ManagedAttribute(value = "The max number of connections", readonly = true)
public int getMaxConnectionCount()
{
return maxConnections;
}
@ManagedAttribute(value = "The number of connections", readonly = true)
public int getConnectionCount()
{
return connectionCount.get();
}
@Override
public boolean isEmpty()
{
return connectionCount.get() == 0;
}
@Override
public boolean isClosed()
{
return closed.get();
}
@Override
public Connection acquire()
{
Connection connection = activate();
if (connection == null)
connection = tryCreate();
return connection;
}
private Connection tryCreate()
{
while (true)
{
int current = getConnectionCount();
final int next = current + 1;
if (next > maxConnections)
{
if (LOG.isDebugEnabled())
LOG.debug("Max connections {}/{} reached", current, maxConnections);
// Try again the idle connections
return activate();
}
if (connectionCount.compareAndSet(current, next))
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation", next, maxConnections);
destination.newConnection(new Promise<Connection>()
{
@Override
public void succeeded(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection);
onCreated(connection);
proceed();
}
@Override
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection " + next + "/" + maxConnections + " creation failed", x);
connectionCount.decrementAndGet();
requester.failed(x);
}
});
// Try again the idle connections
return activate();
}
}
}
protected abstract void onCreated(Connection connection);
protected void proceed()
{
requester.succeeded();
}
protected abstract Connection activate();
protected Connection active(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection active {}", connection);
acquired(connection);
return connection;
}
protected void acquired(Connection connection)
{
}
protected boolean idle(Connection connection, boolean close)
{
if (close)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection idle close {}", connection);
return false;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Connection idle {}", connection);
return true;
}
}
protected void released(Connection connection)
{
}
protected void removed(Connection connection)
{
int pooled = connectionCount.decrementAndGet();
if (LOG.isDebugEnabled())
LOG.debug("Connection removed {} - pooled: {}", connection, pooled);
}
@Override
public void close()
{
if (closed.compareAndSet(false, true))
{
connectionCount.set(0);
}
}
protected void close(Collection<Connection> connections)
{
connections.forEach(Connection::close);
}
@Override
public String dump()
{
return ContainerLifeCycle.dump(this);
}
}

View File

@ -18,17 +18,24 @@
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.Callback;
import java.io.Closeable;
/**
* @deprecated use {@link DuplexConnectionPool} instead
*/
@Deprecated
public class ConnectionPool extends DuplexConnectionPool
import org.eclipse.jetty.client.api.Connection;
public interface ConnectionPool extends Closeable
{
public ConnectionPool(Destination destination, int maxConnections, Callback requester)
{
super(destination, maxConnections, requester);
}
boolean isActive(Connection connection);
boolean isEmpty();
boolean isClosed();
Connection acquire();
boolean release(Connection connection);
boolean remove(Connection connection);
@Override
void close();
}

View File

@ -18,21 +18,20 @@
package org.eclipse.jetty.client;
import java.io.Closeable;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Deque;
import java.util.HashSet;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.Set;
import java.util.concurrent.locks.ReentrantLock;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
@ -42,31 +41,29 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Sweeper;
@ManagedObject("The connection pool")
public class DuplexConnectionPool implements Closeable, Dumpable, Sweeper.Sweepable
public class DuplexConnectionPool extends AbstractConnectionPool implements Dumpable, Sweeper.Sweepable
{
private static final Logger LOG = Log.getLogger(DuplexConnectionPool.class);
private final AtomicInteger connectionCount = new AtomicInteger();
private final ReentrantLock lock = new ReentrantLock();
private final Destination destination;
private final int maxConnections;
private final Callback requester;
private final Deque<Connection> idleConnections;
private final Queue<Connection> activeConnections;
private final Set<Connection> activeConnections;
public DuplexConnectionPool(Destination destination, int maxConnections, Callback requester)
{
this.destination = destination;
this.maxConnections = maxConnections;
this.requester = requester;
this.idleConnections = new LinkedBlockingDeque<>(maxConnections);
this.activeConnections = new BlockingArrayQueue<>(maxConnections);
super(destination, maxConnections, requester);
this.idleConnections = new ArrayDeque<>(maxConnections);
this.activeConnections = new HashSet<>(maxConnections);
}
@ManagedAttribute(value = "The number of connections", readonly = true)
public int getConnectionCount()
protected void lock()
{
return connectionCount.get();
lock.lock();
}
protected void unlock()
{
lock.unlock();
}
@ManagedAttribute(value = "The number of idle connections", readonly = true)
@ -102,139 +99,76 @@ public class DuplexConnectionPool implements Closeable, Dumpable, Sweeper.Sweepa
return idleConnections;
}
public Queue<Connection> getActiveConnections()
public Collection<Connection> getActiveConnections()
{
return activeConnections;
}
public Connection acquire()
@Override
public boolean isActive(Connection connection)
{
Connection connection = activateIdle();
if (connection == null)
connection = tryCreate();
return connection;
}
private Connection tryCreate()
{
while (true)
lock();
try
{
int current = getConnectionCount();
final int next = current + 1;
if (next > maxConnections)
{
if (LOG.isDebugEnabled())
LOG.debug("Max connections {}/{} reached", current, maxConnections);
// Try again the idle connections
return activateIdle();
}
if (connectionCount.compareAndSet(current, next))
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation", next, maxConnections);
destination.newConnection(new Promise<Connection>()
{
@Override
public void succeeded(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection);
idleCreated(connection);
proceed();
}
@Override
public void failed(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection " + next + "/" + maxConnections + " creation failed", x);
connectionCount.decrementAndGet();
requester.failed(x);
}
});
// Try again the idle connections
return activateIdle();
}
return activeConnections.contains(connection);
}
finally
{
unlock();
}
}
protected void proceed()
@Override
protected void onCreated(Connection connection)
{
requester.succeeded();
}
protected void idleCreated(Connection connection)
{
boolean idle;
lock();
try
{
// Use "cold" new connections as last.
idle = idleConnections.offerLast(connection);
idleConnections.offer(connection);
}
finally
{
unlock();
}
idle(connection, idle);
idle(connection, false);
}
private Connection activateIdle()
@Override
protected Connection activate()
{
boolean acquired;
Connection connection;
lock();
try
{
connection = idleConnections.pollFirst();
connection = idleConnections.poll();
if (connection == null)
return null;
acquired = activeConnections.offer(connection);
activeConnections.add(connection);
}
finally
{
unlock();
}
if (acquired)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection active {}", connection);
acquired(connection);
return connection;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Connection active overflow {}", connection);
connection.close();
return null;
}
}
protected void acquired(Connection connection)
{
return active(connection);
}
public boolean release(Connection connection)
{
boolean idle;
boolean closed = isClosed();
lock();
try
{
if (!activeConnections.remove(connection))
return false;
// Make sure we use "hot" connections first.
idle = offerIdle(connection);
if (!closed)
{
// Make sure we use "hot" connections first.
deactivate(connection);
}
}
finally
{
@ -242,35 +176,14 @@ public class DuplexConnectionPool implements Closeable, Dumpable, Sweeper.Sweepa
}
released(connection);
return idle(connection, idle);
return idle(connection, closed);
}
protected boolean offerIdle(Connection connection)
protected boolean deactivate(Connection connection)
{
return idleConnections.offerFirst(connection);
}
protected boolean idle(Connection connection, boolean idle)
{
if (idle)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection idle {}", connection);
return true;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Connection idle overflow {}", connection);
connection.close();
return false;
}
}
protected void released(Connection connection)
{
}
public boolean remove(Connection connection)
{
return remove(connection, false);
@ -295,55 +208,21 @@ public class DuplexConnectionPool implements Closeable, Dumpable, Sweeper.Sweepa
released(connection);
boolean removed = activeRemoved || idleRemoved || force;
if (removed)
{
int pooled = connectionCount.decrementAndGet();
if (LOG.isDebugEnabled())
LOG.debug("Connection removed {} - pooled: {}", connection, pooled);
}
removed(connection);
return removed;
}
public boolean isActive(Connection connection)
{
lock();
try
{
return activeConnections.contains(connection);
}
finally
{
unlock();
}
}
public boolean isIdle(Connection connection)
{
lock();
try
{
return idleConnections.contains(connection);
}
finally
{
unlock();
}
}
public boolean isEmpty()
{
return connectionCount.get() == 0;
}
public void close()
{
List<Connection> idles = new ArrayList<>();
List<Connection> actives = new ArrayList<>();
super.close();
List<Connection> connections = new ArrayList<>();
lock();
try
{
idles.addAll(idleConnections);
connections.addAll(idleConnections);
idleConnections.clear();
actives.addAll(activeConnections);
connections.addAll(activeConnections);
activeConnections.clear();
}
finally
@ -351,32 +230,18 @@ public class DuplexConnectionPool implements Closeable, Dumpable, Sweeper.Sweepa
unlock();
}
connectionCount.set(0);
for (Connection connection : idles)
connection.close();
// A bit drastic, but we cannot wait for all requests to complete
for (Connection connection : actives)
connection.close();
}
@Override
public String dump()
{
return ContainerLifeCycle.dump(this);
close(connections);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
List<Connection> actives = new ArrayList<>();
List<Connection> idles = new ArrayList<>();
List<Connection> connections = new ArrayList<>();
lock();
try
{
actives.addAll(activeConnections);
idles.addAll(idleConnections);
connections.addAll(activeConnections);
connections.addAll(idleConnections);
}
finally
{
@ -384,7 +249,7 @@ public class DuplexConnectionPool implements Closeable, Dumpable, Sweeper.Sweepa
}
ContainerLifeCycle.dumpObject(out, this);
ContainerLifeCycle.dump(out, indent, actives, idles);
ContainerLifeCycle.dump(out, indent, connections);
}
@Override
@ -422,16 +287,6 @@ public class DuplexConnectionPool implements Closeable, Dumpable, Sweeper.Sweepa
return false;
}
protected void lock()
{
lock.lock();
}
protected void unlock()
{
lock.unlock();
}
@Override
public String toString()
{
@ -450,8 +305,8 @@ public class DuplexConnectionPool implements Closeable, Dumpable, Sweeper.Sweepa
return String.format("%s[c=%d/%d,a=%d,i=%d]",
getClass().getSimpleName(),
connectionCount.get(),
maxConnections,
getConnectionCount(),
getMaxConnectionCount(),
activeSize,
idleSize);
}

View File

@ -22,18 +22,21 @@ import java.io.Closeable;
import java.io.IOException;
import java.nio.channels.AsynchronousCloseException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.io.ClientConnectionFactory;
import org.eclipse.jetty.io.ssl.SslClientConnectionFactory;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
@ -41,9 +44,10 @@ import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Sweeper;
@ManagedObject
public abstract class HttpDestination extends ContainerLifeCycle implements Destination, Closeable, Dumpable
public abstract class HttpDestination extends ContainerLifeCycle implements Destination, Closeable, Callback, Dumpable
{
protected static final Logger LOG = Log.getLogger(HttpDestination.class);
@ -55,6 +59,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
private final ProxyConfiguration.Proxy proxy;
private final ClientConnectionFactory connectionFactory;
private final HttpField hostField;
private ConnectionPool connectionPool;
public HttpDestination(HttpClient client, Origin origin)
{
@ -86,6 +91,29 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
hostField = new HttpField(HttpHeader.HOST, host);
}
@Override
protected void doStart() throws Exception
{
this.connectionPool = newConnectionPool(client);
addBean(connectionPool);
super.doStart();
Sweeper sweeper = client.getBean(Sweeper.class);
if (sweeper != null && connectionPool instanceof Sweeper.Sweepable)
sweeper.offer((Sweeper.Sweepable)connectionPool);
}
@Override
protected void doStop() throws Exception
{
Sweeper sweeper = client.getBean(Sweeper.class);
if (sweeper != null && connectionPool instanceof Sweeper.Sweepable)
sweeper.remove((Sweeper.Sweepable)connectionPool);
super.doStop();
removeBean(connectionPool);
}
protected abstract ConnectionPool newConnectionPool(HttpClient client);
protected Queue<HttpExchange> newExchangeQueue(HttpClient client)
{
return new BlockingArrayQueue<>(client.getMaxRequestsQueuedPerDestination());
@ -175,6 +203,24 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
return hostField;
}
@ManagedAttribute(value = "The connection pool", readonly = true)
public ConnectionPool getConnectionPool()
{
return connectionPool;
}
@Override
public void succeeded()
{
send();
}
@Override
public void failed(Throwable x)
{
abort(x);
}
protected void send(HttpRequest request, List<Response.ResponseListener> listeners)
{
if (!getScheme().equalsIgnoreCase(request.getScheme()))
@ -221,7 +267,59 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
return queue.offer(exchange);
}
public abstract void send();
public void send()
{
if (getHttpExchanges().isEmpty())
return;
process();
}
private void process()
{
Connection connection = connectionPool.acquire();
if (connection != null)
process(connection);
}
public void process(final Connection connection)
{
HttpClient client = getHttpClient();
final HttpExchange exchange = getHttpExchanges().poll();
if (LOG.isDebugEnabled())
LOG.debug("Processing exchange {} on {} of {}", exchange, connection, this);
if (exchange == null)
{
if (!connectionPool.release(connection))
connection.close();
if (!client.isRunning())
{
if (LOG.isDebugEnabled())
LOG.debug("{} is stopping", client);
connection.close();
}
}
else
{
final Request request = exchange.getRequest();
Throwable cause = request.getAbortCause();
if (cause != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Aborted before processing {}: {}", exchange, cause);
// It may happen that the request is aborted before the exchange
// is created. Aborting the exchange a second time will result in
// a no-operation, so we just abort here to cover that edge case.
exchange.abort(cause);
}
else
{
send(connection, exchange);
}
}
}
protected abstract void send(Connection connection, HttpExchange exchange);
public void newConnection(Promise<Connection> promise)
{
@ -243,14 +341,67 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
abort(new AsynchronousCloseException());
if (LOG.isDebugEnabled())
LOG.debug("Closed {}", this);
connectionPool.close();
}
public void release(Connection connection)
{
if (LOG.isDebugEnabled())
LOG.debug("Released {}", connection);
HttpClient client = getHttpClient();
if (client.isRunning())
{
if (connectionPool.isActive(connection))
{
if (connectionPool.release(connection))
send();
else
connection.close();
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Released explicit {}", connection);
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("{} is stopped", client);
connection.close();
}
}
public boolean remove(Connection connection)
{
return connectionPool.remove(connection);
}
public void close(Connection connection)
{
boolean removed = remove(connection);
if (getHttpExchanges().isEmpty())
{
if (getHttpClient().isRemoveIdleDestinations() && connectionPool.isEmpty())
{
// There is a race condition between this thread removing the destination
// and another thread queueing a request to this same destination.
// If this destination is removed, but the request queued, a new connection
// will be opened, the exchange will be executed and eventually the connection
// will idle timeout and be closed. Meanwhile a new destination will be created
// in HttpClient and will be used for other requests.
getHttpClient().removeDestination(this);
}
}
else
{
// We need to execute queued requests even if this connection failed.
// We may create a connection that is not needed, but it will eventually
// idle timeout, so no worries.
if (removed)
process();
}
}
/**
@ -278,6 +429,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
public void dump(Appendable out, String indent) throws IOException
{
ContainerLifeCycle.dumpObject(out, toString());
ContainerLifeCycle.dump(out, indent, Collections.singletonList(connectionPool));
}
public String asString()
@ -288,11 +440,12 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
@Override
public String toString()
{
return String.format("%s[%s]%x%s,queue=%d",
return String.format("%s[%s]%x%s,queue=%d,pool=%s",
HttpDestination.class.getSimpleName(),
asString(),
hashCode(),
proxy == null ? "" : "(via " + proxy + ")",
exchanges.size());
exchanges.size(),
connectionPool);
}
}

View File

@ -25,7 +25,7 @@ import org.eclipse.jetty.util.LeakDetector;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class LeakTrackingConnectionPool extends ConnectionPool
public class LeakTrackingConnectionPool extends DuplexConnectionPool
{
private static final Logger LOG = Log.getLogger(LeakTrackingConnectionPool.class);

View File

@ -0,0 +1,302 @@
//
// ========================================================================
// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Collectors;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
public class MultiplexConnectionPool extends AbstractConnectionPool
{
private static final Logger LOG = Log.getLogger(MultiplexConnectionPool.class);
private final ReentrantLock lock = new ReentrantLock();
private final int maxMultiplexed;
private final Deque<Holder> idleConnections;
private final Map<Connection, Holder> muxedConnections;
private final Map<Connection, Holder> busyConnections;
public MultiplexConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplexed)
{
super(destination, maxConnections, requester);
this.maxMultiplexed = maxMultiplexed;
this.idleConnections = new ArrayDeque<>(maxConnections);
this.muxedConnections = new HashMap<>(maxConnections);
this.busyConnections = new HashMap<>(maxConnections);
}
protected void lock()
{
lock.lock();
}
protected void unlock()
{
lock.unlock();
}
@Override
public boolean isActive(Connection connection)
{
lock();
try
{
if (muxedConnections.containsKey(connection))
return true;
if (busyConnections.containsKey(connection))
return true;
return false;
}
finally
{
unlock();
}
}
@Override
protected void onCreated(Connection connection)
{
lock();
try
{
// Use "cold" connections as last.
idleConnections.offer(new Holder(connection));
}
finally
{
unlock();
}
idle(connection, false);
}
@Override
protected Connection activate()
{
Holder holder;
lock();
try
{
while (true)
{
if (muxedConnections.isEmpty())
{
holder = idleConnections.poll();
if (holder == null)
return null;
muxedConnections.put(holder.connection, holder);
}
else
{
holder = muxedConnections.values().iterator().next();
}
if (holder.count < maxMultiplexed)
{
++holder.count;
break;
}
else
{
muxedConnections.remove(holder.connection);
busyConnections.put(holder.connection, holder);
}
}
}
finally
{
unlock();
}
return active(holder.connection);
}
@Override
public boolean release(Connection connection)
{
boolean closed = isClosed();
boolean idle = false;
Holder holder;
lock();
try
{
holder = muxedConnections.get(connection);
if (holder != null)
{
int count = --holder.count;
if (count == 0)
{
muxedConnections.remove(connection);
if (!closed)
{
idleConnections.offerFirst(holder);
idle = true;
}
}
}
else
{
holder = busyConnections.remove(connection);
if (holder != null)
{
int count = --holder.count;
if (!closed)
{
if (count == 0)
{
idleConnections.offerFirst(holder);
idle = true;
}
else
{
muxedConnections.put(connection, holder);
}
}
}
}
}
finally
{
unlock();
}
if (holder == null)
return false;
released(connection);
if (idle || closed)
return idle(connection, closed);
return true;
}
@Override
public boolean remove(Connection connection)
{
return remove(connection, false);
}
protected boolean remove(Connection connection, boolean force)
{
boolean activeRemoved = true;
boolean idleRemoved = false;
lock();
try
{
Holder holder = muxedConnections.remove(connection);
if (holder == null)
holder = busyConnections.remove(connection);
if (holder == null)
{
activeRemoved = false;
for (Iterator<Holder> iterator = idleConnections.iterator(); iterator.hasNext();)
{
holder = iterator.next();
if (holder.connection == connection)
{
idleRemoved = true;
iterator.remove();
break;
}
}
}
}
finally
{
unlock();
}
if (activeRemoved || force)
released(connection);
boolean removed = activeRemoved || idleRemoved || force;
if (removed)
removed(connection);
return removed;
}
@Override
public void close()
{
super.close();
List<Connection> connections;
lock();
try
{
connections = idleConnections.stream().map(holder -> holder.connection).collect(Collectors.toList());
connections.addAll(muxedConnections.keySet());
connections.addAll(busyConnections.keySet());
}
finally
{
unlock();
}
close(connections);
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
List<Holder> connections = new ArrayList<>();
lock();
try
{
connections.addAll(busyConnections.values());
connections.addAll(muxedConnections.values());
connections.addAll(idleConnections);
}
finally
{
unlock();
}
ContainerLifeCycle.dumpObject(out, this);
ContainerLifeCycle.dump(out, indent, connections);
}
private static class Holder
{
private final Connection connection;
private int count;
private Holder(Connection connection)
{
this.connection = connection;
}
@Override
public String toString()
{
return String.format("%s[%d]", connection, count);
}
}
}

View File

@ -18,136 +18,16 @@
package org.eclipse.jetty.client;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.util.Promise;
public abstract class MultiplexHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection>
public abstract class MultiplexHttpDestination extends HttpDestination
{
private final AtomicReference<ConnectState> connect = new AtomicReference<>(ConnectState.DISCONNECTED);
private C connection;
protected MultiplexHttpDestination(HttpClient client, Origin origin)
{
super(client, origin);
}
@Override
public void send()
protected ConnectionPool newConnectionPool(HttpClient client)
{
while (true)
{
ConnectState current = connect.get();
switch (current)
{
case DISCONNECTED:
{
if (!connect.compareAndSet(current, ConnectState.CONNECTING))
break;
newConnection(this);
return;
}
case CONNECTING:
{
// Waiting to connect, just return
return;
}
case CONNECTED:
{
if (process(connection))
break;
return;
}
default:
{
abort(new IllegalStateException("Invalid connection state " + current));
return;
}
}
}
}
@Override
@SuppressWarnings("unchecked")
public void succeeded(Connection result)
{
C connection = this.connection = (C)result;
if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED))
{
process(connection);
}
else
{
connection.close();
failed(new IllegalStateException());
}
}
@Override
public void failed(Throwable x)
{
connect.set(ConnectState.DISCONNECTED);
abort(x);
}
protected boolean process(final C connection)
{
HttpClient client = getHttpClient();
final HttpExchange exchange = getHttpExchanges().poll();
if (LOG.isDebugEnabled())
LOG.debug("Processing {} on {}", exchange, connection);
if (exchange == null)
return false;
final Request request = exchange.getRequest();
Throwable cause = request.getAbortCause();
if (cause != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Aborted before processing {}: {}", exchange, cause);
// It may happen that the request is aborted before the exchange
// is created. Aborting the exchange a second time will result in
// a no-operation, so we just abort here to cover that edge case.
exchange.abort(cause);
}
else
{
send(connection, exchange);
}
return true;
}
@Override
public void close()
{
super.close();
C connection = this.connection;
if (connection != null)
connection.close();
}
@Override
public void close(Connection connection)
{
super.close(connection);
while (true)
{
ConnectState current = connect.get();
if (connect.compareAndSet(current, ConnectState.DISCONNECTED))
{
if (getHttpClient().isRemoveIdleDestinations())
getHttpClient().removeDestination(this);
break;
}
}
}
protected abstract void send(C connection, HttpExchange exchange);
private enum ConnectState
{
DISCONNECTED, CONNECTING, CONNECTED
return new MultiplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this,
client.getMaxRequestsQueuedPerDestination());
}
}

View File

@ -18,229 +18,15 @@
package org.eclipse.jetty.client;
import java.io.IOException;
import java.util.Collections;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.thread.Sweeper;
@ManagedObject
public abstract class PoolingHttpDestination<C extends Connection> extends HttpDestination implements Callback
public abstract class PoolingHttpDestination extends HttpDestination
{
private DuplexConnectionPool connectionPool;
public PoolingHttpDestination(HttpClient client, Origin origin)
{
super(client, origin);
this.connectionPool = newConnectionPool(client);
addBean(connectionPool);
Sweeper sweeper = client.getBean(Sweeper.class);
if (sweeper != null)
sweeper.offer(connectionPool);
}
@Override
protected void doStart() throws Exception
{
HttpClient client = getHttpClient();
this.connectionPool = newConnectionPool(client);
addBean(connectionPool);
super.doStart();
Sweeper sweeper = client.getBean(Sweeper.class);
if (sweeper != null)
sweeper.offer(connectionPool);
}
@Override
protected void doStop() throws Exception
{
HttpClient client = getHttpClient();
Sweeper sweeper = client.getBean(Sweeper.class);
if (sweeper != null)
sweeper.remove(connectionPool);
super.doStop();
removeBean(connectionPool);
}
protected DuplexConnectionPool newConnectionPool(HttpClient client)
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new DuplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this);
}
@ManagedAttribute(value = "The connection pool", readonly = true)
public DuplexConnectionPool getConnectionPool()
{
return connectionPool;
}
@Override
public void succeeded()
{
send();
}
@Override
public void failed(final Throwable x)
{
abort(x);
}
public void send()
{
if (getHttpExchanges().isEmpty())
return;
process();
}
@SuppressWarnings("unchecked")
public C acquire()
{
return (C)connectionPool.acquire();
}
private void process()
{
C connection = acquire();
if (connection != null)
process(connection);
}
/**
* <p>Processes a new connection making it idle or active depending on whether requests are waiting to be sent.</p>
* <p>A new connection is created when a request needs to be executed; it is possible that the request that
* triggered the request creation is executed by another connection that was just released, so the new connection
* may become idle.</p>
* <p>If a request is waiting to be executed, it will be dequeued and executed by the new connection.</p>
*
* @param connection the new connection
*/
public void process(final C connection)
{
HttpClient client = getHttpClient();
final HttpExchange exchange = getHttpExchanges().poll();
if (LOG.isDebugEnabled())
LOG.debug("Processing exchange {} on {} of {}", exchange, connection, this);
if (exchange == null)
{
if (!connectionPool.release(connection))
connection.close();
if (!client.isRunning())
{
if (LOG.isDebugEnabled())
LOG.debug("{} is stopping", client);
connection.close();
}
}
else
{
final Request request = exchange.getRequest();
Throwable cause = request.getAbortCause();
if (cause != null)
{
if (LOG.isDebugEnabled())
LOG.debug("Aborted before processing {}: {}", exchange, cause);
// It may happen that the request is aborted before the exchange
// is created. Aborting the exchange a second time will result in
// a no-operation, so we just abort here to cover that edge case.
exchange.abort(cause);
}
else
{
send(connection, exchange);
}
}
}
protected abstract void send(C connection, HttpExchange exchange);
@Override
public void release(Connection c)
{
@SuppressWarnings("unchecked")
C connection = (C)c;
if (LOG.isDebugEnabled())
LOG.debug("Released {}", connection);
HttpClient client = getHttpClient();
if (client.isRunning())
{
if (connectionPool.isActive(connection))
{
if (connectionPool.release(connection))
send();
else
connection.close();
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Released explicit {}", connection);
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("{} is stopped", client);
connection.close();
}
}
public boolean remove(Connection connection)
{
return connectionPool.remove(connection);
}
@Override
public void close(Connection connection)
{
super.close(connection);
boolean removed = remove(connection);
if (getHttpExchanges().isEmpty())
{
if (getHttpClient().isRemoveIdleDestinations() && connectionPool.isEmpty())
{
// There is a race condition between this thread removing the destination
// and another thread queueing a request to this same destination.
// If this destination is removed, but the request queued, a new connection
// will be opened, the exchange will be executed and eventually the connection
// will idle timeout and be closed. Meanwhile a new destination will be created
// in HttpClient and will be used for other requests.
getHttpClient().removeDestination(this);
}
}
else
{
// We need to execute queued requests even if this connection failed.
// We may create a connection that is not needed, but it will eventually
// idle timeout, so no worries.
if (removed)
process();
}
}
public void close()
{
super.close();
connectionPool.close();
}
@Override
public void dump(Appendable out, String indent) throws IOException
{
super.dump(out, indent);
ContainerLifeCycle.dump(out, indent, Collections.singletonList(connectionPool));
}
@Override
public String toString()
{
return String.format("%s,pool=%s", super.toString(), connectionPool);
}
}

View File

@ -56,7 +56,7 @@ import org.eclipse.jetty.util.thread.Scheduler;
* tuning the idle timeout of the servers to be larger than
* that of the client.</p>
*/
public class ValidatingConnectionPool extends ConnectionPool
public class ValidatingConnectionPool extends DuplexConnectionPool
{
private static final Logger LOG = Log.getLogger(ValidatingConnectionPool.class);
@ -154,7 +154,7 @@ public class ValidatingConnectionPool extends ConnectionPool
private class Holder implements Runnable
{
private final long timestamp = System.nanoTime();
private final AtomicBoolean latch = new AtomicBoolean();
private final AtomicBoolean done = new AtomicBoolean();
private final Connection connection;
public Scheduler.Task task;
@ -166,30 +166,31 @@ public class ValidatingConnectionPool extends ConnectionPool
@Override
public void run()
{
if (latch.compareAndSet(false, true))
if (done.compareAndSet(false, true))
{
boolean idle;
boolean closed = isClosed();
lock();
try
{
quarantine.remove(connection);
idle = offerIdle(connection);
if (LOG.isDebugEnabled())
LOG.debug("Validated {}", connection);
quarantine.remove(connection);
if (!closed)
deactivate(connection);
}
finally
{
unlock();
}
if (idle(connection, idle))
proceed();
idle(connection, closed);
proceed();
}
}
public boolean cancel()
{
if (latch.compareAndSet(false, true))
if (done.compareAndSet(false, true))
{
task.cancel();
return true;

View File

@ -22,8 +22,9 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.PoolingHttpDestination;
import org.eclipse.jetty.client.api.Connection;
public class HttpDestinationOverHTTP extends PoolingHttpDestination<HttpConnectionOverHTTP>
public class HttpDestinationOverHTTP extends PoolingHttpDestination
{
public HttpDestinationOverHTTP(HttpClient client, Origin origin)
{
@ -31,8 +32,8 @@ public class HttpDestinationOverHTTP extends PoolingHttpDestination<HttpConnecti
}
@Override
protected void send(HttpConnectionOverHTTP connection, HttpExchange exchange)
protected void send(Connection connection, HttpExchange exchange)
{
connection.send(exchange);
((HttpConnectionOverHTTP)connection).send(exchange);
}
}

View File

@ -59,7 +59,7 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe
Assert.assertEquals(200, response.getStatus());
HttpDestinationOverHTTP httpDestination = (HttpDestinationOverHTTP)destination;
DuplexConnectionPool connectionPool = httpDestination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)httpDestination.getConnectionPool();
Assert.assertTrue(connectionPool.getActiveConnections().isEmpty());
Assert.assertTrue(connectionPool.getIdleConnections().isEmpty());
}
@ -94,7 +94,7 @@ public class HttpClientExplicitConnectionTest extends AbstractHttpClientServerTe
Assert.assertFalse(httpConnection.getEndPoint().isOpen());
HttpDestinationOverHTTP httpDestination = (HttpDestinationOverHTTP)destination;
DuplexConnectionPool connectionPool = httpDestination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)httpDestination.getConnectionPool();
Assert.assertTrue(connectionPool.getActiveConnections().isEmpty());
Assert.assertTrue(connectionPool.getIdleConnections().isEmpty());
}

View File

@ -25,9 +25,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.util.DeferredContentProvider;
@ -89,14 +86,7 @@ public class HttpClientFailureTest
try
{
client.newRequest("localhost", connector.getLocalPort())
.onRequestHeaders(new Request.HeadersListener()
{
@Override
public void onHeaders(Request request)
{
connectionRef.get().getEndPoint().close();
}
})
.onRequestHeaders(request -> connectionRef.get().getEndPoint().close())
.timeout(5, TimeUnit.SECONDS)
.send();
Assert.fail();
@ -106,7 +96,7 @@ public class HttpClientFailureTest
// Expected.
}
DuplexConnectionPool connectionPool = connectionRef.get().getHttpDestination().getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)connectionRef.get().getHttpDestination().getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
@ -134,25 +124,17 @@ public class HttpClientFailureTest
final CountDownLatch completeLatch = new CountDownLatch(1);
DeferredContentProvider content = new DeferredContentProvider();
client.newRequest("localhost", connector.getLocalPort())
.onRequestCommit(new Request.CommitListener()
.onRequestCommit(request ->
{
@Override
public void onCommit(Request request)
{
connectionRef.get().getEndPoint().close();
commitLatch.countDown();
}
connectionRef.get().getEndPoint().close();
commitLatch.countDown();
})
.content(content)
.idleTimeout(2, TimeUnit.SECONDS)
.send(new Response.CompleteListener()
.send(result ->
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
completeLatch.countDown();
}
if (result.isFailed())
completeLatch.countDown();
});
Assert.assertTrue(commitLatch.await(5, TimeUnit.SECONDS));
@ -170,7 +152,7 @@ public class HttpClientFailureTest
Assert.assertTrue(contentLatch.await(5, TimeUnit.SECONDS));
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
DuplexConnectionPool connectionPool = connectionRef.get().getHttpDestination().getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)connectionRef.get().getHttpDestination().getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());

View File

@ -111,7 +111,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertEquals(200, response.getStatus());
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
long start = System.nanoTime();
HttpConnectionOverHTTP connection = null;
@ -367,16 +367,12 @@ public class HttpClientTest extends AbstractHttpClientServerTest
final byte[] content = {0, 1, 2, 3};
ContentResponse response = client.POST(scheme + "://localhost:" + connector.getLocalPort())
.onRequestContent(new Request.ContentListener()
.onRequestContent((request, buffer) ->
{
@Override
public void onContent(Request request, ByteBuffer buffer)
{
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
if (!Arrays.equals(content, bytes))
request.abort(new Exception());
}
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
if (!Arrays.equals(content, bytes))
request.abort(new Exception());
})
.content(new BytesContentProvider(content))
.timeout(5, TimeUnit.SECONDS)
@ -401,16 +397,12 @@ public class HttpClientTest extends AbstractHttpClientServerTest
final AtomicInteger progress = new AtomicInteger();
ContentResponse response = client.POST(scheme + "://localhost:" + connector.getLocalPort())
.onRequestContent(new Request.ContentListener()
.onRequestContent((request, buffer) ->
{
@Override
public void onContent(Request request, ByteBuffer buffer)
{
byte[] bytes = new byte[buffer.remaining()];
Assert.assertEquals(1, bytes.length);
buffer.get(bytes);
Assert.assertEquals(bytes[0], progress.getAndIncrement());
}
byte[] bytes = new byte[buffer.remaining()];
Assert.assertEquals(1, bytes.length);
buffer.get(bytes);
Assert.assertEquals(bytes[0], progress.getAndIncrement());
})
.content(new BytesContentProvider(new byte[]{0}, new byte[]{1}, new byte[]{2}, new byte[]{3}, new byte[]{4}))
.timeout(5, TimeUnit.SECONDS)
@ -432,19 +424,15 @@ public class HttpClientTest extends AbstractHttpClientServerTest
final CountDownLatch successLatch = new CountDownLatch(2);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onRequestBegin(new Request.BeginListener()
.onRequestBegin(request ->
{
@Override
public void onBegin(Request request)
try
{
try
{
latch.await();
}
catch (InterruptedException x)
{
x.printStackTrace();
}
latch.await();
}
catch (InterruptedException x)
{
x.printStackTrace();
}
})
.send(new Response.Listener.Adapter()
@ -459,14 +447,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onRequestQueued(new Request.QueuedListener()
{
@Override
public void onQueued(Request request)
{
latch.countDown();
}
})
.onRequestQueued(request -> latch.countDown())
.send(new Response.Listener.Adapter()
{
@Override
@ -514,27 +495,16 @@ public class HttpClientTest extends AbstractHttpClientServerTest
latch.countDown();
}
})
.onResponseFailure(new Response.FailureListener()
{
@Override
public void onFailure(Response response, Throwable failure)
{
latch.countDown();
}
})
.onResponseFailure((response, failure) -> latch.countDown())
.send(null);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.path("/two")
.onResponseSuccess(new Response.SuccessListener()
.onResponseSuccess(response ->
{
@Override
public void onSuccess(Response response)
{
Assert.assertEquals(200, response.getStatus());
latch.countDown();
}
Assert.assertEquals(200, response.getStatus());
latch.countDown();
})
.send(null);
@ -564,14 +534,10 @@ public class HttpClientTest extends AbstractHttpClientServerTest
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.file(file)
.onRequestSuccess(new Request.SuccessListener()
.onRequestSuccess(request ->
{
@Override
public void onSuccess(Request request)
{
requestTime.set(System.nanoTime());
latch.countDown();
}
requestTime.set(System.nanoTime());
latch.countDown();
})
.send(new Response.Listener.Adapter()
{
@ -674,14 +640,11 @@ public class HttpClientTest extends AbstractHttpClientServerTest
final int port = connector.getLocalPort();
client.newRequest(host, port)
.scheme(scheme)
.onRequestBegin(new Request.BeginListener()
.onRequestBegin(request ->
{
@Override
public void onBegin(Request request)
{
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
destination.getConnectionPool().getActiveConnections().peek().close();
}
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
connectionPool.getActiveConnections().iterator().next().close();
})
.send(new Response.Listener.Adapter()
{
@ -773,14 +736,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest
ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onResponseHeader(new Response.HeaderListener()
{
@Override
public boolean onHeader(Response response, HttpField field)
{
return !field.getName().equals(headerName);
}
})
.onResponseHeader((response1, field) -> !field.getName().equals(headerName))
.timeout(5, TimeUnit.SECONDS)
.send();
@ -864,16 +820,12 @@ public class HttpClientTest extends AbstractHttpClientServerTest
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("idontexist", 80)
.send(new Response.CompleteListener()
.send(result ->
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
Throwable failure = result.getFailure();
Assert.assertTrue(failure instanceof UnknownHostException);
latch.countDown();
}
Assert.assertTrue(result.isFailed());
Throwable failure = result.getFailure();
Assert.assertTrue(failure instanceof UnknownHostException);
latch.countDown();
});
Assert.assertTrue(latch.await(10, TimeUnit.SECONDS));
}
@ -1323,14 +1275,10 @@ public class HttpClientTest extends AbstractHttpClientServerTest
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(new Response.CompleteListener()
.send(result ->
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
completeLatch.countDown();
}
if (result.isFailed())
completeLatch.countDown();
});
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));

View File

@ -31,8 +31,6 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpChannelOverHTTP;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
@ -121,14 +119,7 @@ public class HttpClientUploadDuringServerShutdown
int length = 16 * 1024 * 1024 + random.nextInt(16 * 1024 * 1024);
client.newRequest("localhost", 8888)
.content(new BytesContentProvider(new byte[length]))
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
latch.countDown();
}
});
.send(result -> latch.countDown());
long sleep = 1 + random.nextInt(10);
TimeUnit.MILLISECONDS.sleep(sleep);
}
@ -244,35 +235,24 @@ public class HttpClientUploadDuringServerShutdown
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.timeout(10, TimeUnit.SECONDS)
.onRequestBegin(new org.eclipse.jetty.client.api.Request.BeginListener()
.onRequestBegin(request ->
{
@Override
public void onBegin(org.eclipse.jetty.client.api.Request request)
try
{
try
{
beginLatch.countDown();
completeLatch.await(5, TimeUnit.SECONDS);
}
catch (InterruptedException x)
{
x.printStackTrace();
}
beginLatch.countDown();
completeLatch.await(5, TimeUnit.SECONDS);
}
catch (InterruptedException x)
{
x.printStackTrace();
}
})
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
completeLatch.countDown();
}
});
.send(result -> completeLatch.countDown());
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination("http", "localhost", connector.getLocalPort());
DuplexConnectionPool pool = destination.getConnectionPool();
DuplexConnectionPool pool = (DuplexConnectionPool)destination.getConnectionPool();
Assert.assertEquals(0, pool.getConnectionCount());
Assert.assertEquals(0, pool.getIdleConnections().size());
Assert.assertEquals(0, pool.getActiveConnections().size());

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.Arrays;
import java.util.Collection;
import java.util.Queue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@ -69,35 +70,24 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
final Queue<Connection> idleConnections = connectionPool.getIdleConnections();
final Collection<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final Queue<Connection> activeConnections = connectionPool.getActiveConnections();
final Collection<Connection> activeConnections = connectionPool.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
final CountDownLatch headersLatch = new CountDownLatch(1);
final CountDownLatch successLatch = new CountDownLatch(3);
client.newRequest(host, port)
.scheme(scheme)
.onRequestSuccess(new Request.SuccessListener()
.onRequestSuccess(request -> successLatch.countDown())
.onResponseHeaders(response ->
{
@Override
public void onSuccess(Request request)
{
successLatch.countDown();
}
})
.onResponseHeaders(new Response.HeadersListener()
{
@Override
public void onHeaders(Response response)
{
Assert.assertEquals(0, idleConnections.size());
Assert.assertEquals(1, activeConnections.size());
headersLatch.countDown();
}
Assert.assertEquals(0, idleConnections.size());
Assert.assertEquals(1, activeConnections.size());
headersLatch.countDown();
})
.send(new Response.Listener.Adapter()
{
@ -130,12 +120,12 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
final Queue<Connection> idleConnections = connectionPool.getIdleConnections();
final Collection<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final Queue<Connection> activeConnections = connectionPool.getActiveConnections();
final Collection<Connection> activeConnections = connectionPool.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
final CountDownLatch beginLatch = new CountDownLatch(1);
@ -145,7 +135,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
@Override
public void onBegin(Request request)
{
activeConnections.peek().close();
activeConnections.iterator().next().close();
beginLatch.countDown();
}
@ -181,12 +171,12 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
final Queue<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final Queue<Connection> activeConnections = connectionPool.getActiveConnections();
final Collection<Connection> activeConnections = connectionPool.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
final CountDownLatch successLatch = new CountDownLatch(3);
@ -241,12 +231,12 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
final Queue<Connection> idleConnections = connectionPool.getIdleConnections();
final Collection<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final Queue<Connection> activeConnections = connectionPool.getActiveConnections();
final Collection<Connection> activeConnections = connectionPool.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
final long delay = 1000;
@ -314,12 +304,12 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
final Queue<Connection> idleConnections = connectionPool.getIdleConnections();
final Collection<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final Queue<Connection> activeConnections = connectionPool.getActiveConnections();
final Collection<Connection> activeConnections = connectionPool.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
server.stop();
@ -327,22 +317,11 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
final CountDownLatch failureLatch = new CountDownLatch(2);
client.newRequest(host, port)
.scheme(scheme)
.onRequestFailure(new Request.FailureListener()
.onRequestFailure((request, failure) -> failureLatch.countDown())
.send(result ->
{
@Override
public void onFailure(Request request, Throwable failure)
{
failureLatch.countDown();
}
})
.send(new Response.Listener.Adapter()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
failureLatch.countDown();
}
Assert.assertTrue(result.isFailed());
failureLatch.countDown();
});
Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));
@ -367,12 +346,12 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
final Queue<Connection> idleConnections = connectionPool.getIdleConnections();
final Collection<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final Queue<Connection> activeConnections = connectionPool.getActiveConnections();
final Collection<Connection> activeConnections = connectionPool.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
final CountDownLatch latch = new CountDownLatch(1);
@ -417,12 +396,12 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
final Queue<Connection> idleConnections = connectionPool.getIdleConnections();
final Collection<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final Queue<Connection> activeConnections = connectionPool.getActiveConnections();
final Collection<Connection> activeConnections = connectionPool.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
Log.getLogger(HttpConnection.class).info("Expecting java.lang.IllegalStateException: HttpParser{s=CLOSED,...");
@ -467,12 +446,12 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
final Queue<Connection> idleConnections = connectionPool.getIdleConnections();
final Collection<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final Queue<Connection> activeConnections = connectionPool.getActiveConnections();
final Collection<Connection> activeConnections = connectionPool.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
ContentResponse response = client.newRequest(host, port)
@ -499,25 +478,21 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
String host = "localhost";
int port = connector.getLocalPort();
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, port);
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
final Queue<Connection> idleConnections = connectionPool.getIdleConnections();
final Collection<Connection> idleConnections = connectionPool.getIdleConnections();
Assert.assertEquals(0, idleConnections.size());
final Queue<Connection> activeConnections = connectionPool.getActiveConnections();
final Collection<Connection> activeConnections = connectionPool.getActiveConnections();
Assert.assertEquals(0, activeConnections.size());
client.setStrictEventOrdering(false);
ContentResponse response = client.newRequest(host, port)
.scheme(scheme)
.onResponseBegin(new Response.BeginListener()
.onResponseBegin(response1 ->
{
@Override
public void onBegin(Response response)
{
// Simulate a HTTP 1.0 response has been received.
((HttpResponse)response).version(HttpVersion.HTTP_1_0);
}
// Simulate a HTTP 1.0 response has been received.
((HttpResponse)response1).version(HttpVersion.HTTP_1_0);
})
.send();

View File

@ -25,12 +25,12 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.ByteBufferContentProvider;
@ -88,7 +88,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
}
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort());
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
@ -135,7 +135,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
}
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort());
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
@ -182,7 +182,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
}
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort());
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
@ -204,14 +204,10 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
{
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onRequestCommit(new Request.CommitListener()
.onRequestCommit(request ->
{
@Override
public void onCommit(Request request)
{
aborted.set(request.abort(cause));
latch.countDown();
}
aborted.set(request.abort(cause));
latch.countDown();
})
.timeout(5, TimeUnit.SECONDS)
.send();
@ -225,7 +221,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
}
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort());
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
@ -260,14 +256,10 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
{
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onRequestCommit(new Request.CommitListener()
.onRequestCommit(request ->
{
@Override
public void onCommit(Request request)
{
aborted.set(request.abort(cause));
latch.countDown();
}
aborted.set(request.abort(cause));
latch.countDown();
})
.content(new ByteBufferContentProvider(ByteBuffer.wrap(new byte[]{0}), ByteBuffer.wrap(new byte[]{1}))
{
@ -289,7 +281,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
}
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort());
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
@ -315,14 +307,10 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
{
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onRequestContent(new Request.ContentListener()
.onRequestContent((request, content) ->
{
@Override
public void onContent(Request request, ByteBuffer content)
{
aborted.set(request.abort(cause));
latch.countDown();
}
aborted.set(request.abort(cause));
latch.countDown();
})
.content(new ByteBufferContentProvider(ByteBuffer.wrap(new byte[]{0}), ByteBuffer.wrap(new byte[]{1}))
{
@ -344,7 +332,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
}
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort());
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
@ -454,7 +442,7 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
}
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort());
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
@ -486,15 +474,11 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
Request request = client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.timeout(3 * delay, TimeUnit.MILLISECONDS);
request.send(new Response.CompleteListener()
request.send(result ->
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
Assert.assertSame(cause, result.getFailure());
latch.countDown();
}
Assert.assertTrue(result.isFailed());
Assert.assertSame(cause, result.getFailure());
latch.countDown();
});
TimeUnit.MILLISECONDS.sleep(delay);

View File

@ -151,7 +151,7 @@ public class ServerConnectionCloseTest
// Connection should have been removed from pool.
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination("http", "localhost", port);
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getIdleConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnectionCount());

View File

@ -183,7 +183,7 @@ public class TLSServerConnectionCloseTest
// Connection should have been removed from pool.
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination("http", "localhost", port);
DuplexConnectionPool connectionPool = destination.getConnectionPool();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getIdleConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnectionCount());

View File

@ -24,6 +24,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.AbstractHttpClientServerTest;
import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.EmptyServerHandler;
import org.eclipse.jetty.client.HttpClient;
@ -31,9 +32,6 @@ import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
import org.eclipse.jetty.util.ssl.SslContextFactory;
@ -59,11 +57,13 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
public void test_FirstAcquire_WithEmptyQueue() throws Exception
{
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()));
Connection connection = destination.acquire();
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection = connectionPool.acquire();
if (connection == null)
{
// There are no queued requests, so the newly created connection will be idle
connection = timedPoll(destination.getConnectionPool().getIdleConnections(), 5, TimeUnit.SECONDS);
connection = timedPoll(connectionPool.getIdleConnections(), 5, TimeUnit.SECONDS);
}
Assert.assertNotNull(connection);
}
@ -72,7 +72,9 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
public void test_SecondAcquire_AfterFirstAcquire_WithEmptyQueue_ReturnsSameConnection() throws Exception
{
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()));
Connection connection1 = destination.acquire();
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection1 = connectionPool.acquire();
if (connection1 == null)
{
// There are no queued requests, so the newly created connection will be idle
@ -80,11 +82,11 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
{
TimeUnit.MILLISECONDS.sleep(50);
connection1 = destination.getConnectionPool().getIdleConnections().peek();
connection1 = connectionPool.getIdleConnections().peek();
}
Assert.assertNotNull(connection1);
Connection connection2 = destination.acquire();
Connection connection2 = connectionPool.acquire();
Assert.assertSame(connection1, connection2);
}
}
@ -97,18 +99,18 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()))
{
@Override
protected DuplexConnectionPool newConnectionPool(HttpClient client)
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new DuplexConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
{
@Override
protected void idleCreated(Connection connection)
protected void onCreated(Connection connection)
{
try
{
idleLatch.countDown();
latch.await(5, TimeUnit.SECONDS);
super.idleCreated(connection);
super.onCreated(connection);
}
catch (InterruptedException x)
{
@ -118,7 +120,9 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
};
}
};
Connection connection1 = destination.acquire();
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection1 = connectionPool.acquire();
// Make sure we entered idleCreated().
Assert.assertTrue(idleLatch.await(5, TimeUnit.SECONDS));
@ -128,13 +132,13 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
Assert.assertNull(connection1);
// Second attempt also returns null because we delayed idleCreated() above.
Connection connection2 = destination.acquire();
Connection connection2 = connectionPool.acquire();
Assert.assertNull(connection2);
latch.countDown();
// There must be 2 idle connections.
Queue<Connection> idleConnections = destination.getConnectionPool().getIdleConnections();
Queue<Connection> idleConnections = connectionPool.getIdleConnections();
Connection connection = timedPoll(idleConnections, 5, TimeUnit.SECONDS);
Assert.assertNotNull(connection);
connection = timedPoll(idleConnections, 5, TimeUnit.SECONDS);
@ -145,23 +149,25 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
public void test_Acquire_Process_Release_Acquire_ReturnsSameConnection() throws Exception
{
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()));
HttpConnectionOverHTTP connection1 = destination.acquire();
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
HttpConnectionOverHTTP connection1 = (HttpConnectionOverHTTP)connectionPool.acquire();
long start = System.nanoTime();
while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
{
TimeUnit.MILLISECONDS.sleep(50);
connection1 = (HttpConnectionOverHTTP)destination.getConnectionPool().getIdleConnections().peek();
connection1 = (HttpConnectionOverHTTP)connectionPool.getIdleConnections().peek();
}
Assert.assertNotNull(connection1);
// Acquire the connection to make it active
Assert.assertSame(connection1, destination.acquire());
Assert.assertSame(connection1, connectionPool.acquire());
destination.process(connection1);
destination.release(connection1);
Connection connection2 = destination.acquire();
Connection connection2 = connectionPool.acquire();
Assert.assertSame(connection1, connection2);
}
@ -172,7 +178,9 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
client.setIdleTimeout(idleTimeout);
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()));
Connection connection1 = destination.acquire();
destination.start();
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Connection connection1 = connectionPool.acquire();
if (connection1 == null)
{
// There are no queued requests, so the newly created connection will be idle
@ -180,13 +188,13 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5)
{
TimeUnit.MILLISECONDS.sleep(50);
connection1 = destination.getConnectionPool().getIdleConnections().peek();
connection1 = connectionPool.getIdleConnections().peek();
}
Assert.assertNotNull(connection1);
TimeUnit.MILLISECONDS.sleep(2 * idleTimeout);
connection1 = destination.getConnectionPool().getIdleConnections().poll();
connection1 = connectionPool.getIdleConnections().poll();
Assert.assertNull(connection1);
}
}
@ -210,35 +218,23 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.path("/one")
.onRequestQueued(new Request.QueuedListener()
.onRequestQueued(request ->
{
@Override
public void onQueued(Request request)
{
// This request exceeds the maximum queued, should fail
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.path("/two")
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
Assert.assertTrue(result.isFailed());
Assert.assertThat(result.getRequestFailure(), Matchers.instanceOf(RejectedExecutionException.class));
failureLatch.countDown();
}
});
}
// This request exceeds the maximum queued, should fail
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.path("/two")
.send(result ->
{
Assert.assertTrue(result.isFailed());
Assert.assertThat(result.getRequestFailure(), Matchers.instanceOf(RejectedExecutionException.class));
failureLatch.countDown();
});
})
.send(new Response.CompleteListener()
.send(result ->
{
@Override
public void onComplete(Result result)
{
if (result.isSucceeded())
successLatch.countDown();
}
if (result.isSucceeded())
successLatch.countDown();
});
Assert.assertTrue(failureLatch.await(5, TimeUnit.SECONDS));

View File

@ -30,6 +30,7 @@ import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpRequest;
import org.eclipse.jetty.client.HttpResponseException;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.util.FutureResponseListener;
import org.eclipse.jetty.http.HttpFields;
@ -60,6 +61,7 @@ public class HttpReceiverOverHTTPTest
client = new HttpClient();
client.start();
destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
destination.start();
endPoint = new ByteArrayEndPoint();
connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<>());
endPoint.setConnection(connection);

View File

@ -67,6 +67,7 @@ public class HttpSenderOverHTTPTest
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
destination.start();
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>());
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch headersLatch = new CountDownLatch(1);
@ -100,6 +101,7 @@ public class HttpSenderOverHTTPTest
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
destination.start();
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>());
Request request = client.newRequest(URI.create("http://localhost/"));
connection.send(request, null);
@ -129,6 +131,7 @@ public class HttpSenderOverHTTPTest
// Shutdown output to trigger the exception on write
endPoint.shutdownOutput();
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
destination.start();
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>());
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch failureLatch = new CountDownLatch(2);
@ -158,6 +161,7 @@ public class HttpSenderOverHTTPTest
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16);
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
destination.start();
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>());
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch failureLatch = new CountDownLatch(2);
@ -193,6 +197,7 @@ public class HttpSenderOverHTTPTest
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
destination.start();
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>());
Request request = client.newRequest(URI.create("http://localhost/"));
String content = "abcdef";
@ -227,6 +232,7 @@ public class HttpSenderOverHTTPTest
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
destination.start();
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>());
Request request = client.newRequest(URI.create("http://localhost/"));
String content1 = "0123456789";
@ -262,6 +268,7 @@ public class HttpSenderOverHTTPTest
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", 8080));
destination.start();
HttpConnectionOverHTTP connection = new HttpConnectionOverHTTP(endPoint, destination, new Promise.Adapter<Connection>());
Request request = client.newRequest(URI.create("http://localhost/"));
String content1 = "0123456789";

View File

@ -22,8 +22,9 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.PoolingHttpDestination;
import org.eclipse.jetty.client.api.Connection;
public class HttpDestinationOverFCGI extends PoolingHttpDestination<HttpConnectionOverFCGI>
public class HttpDestinationOverFCGI extends PoolingHttpDestination
{
public HttpDestinationOverFCGI(HttpClient client, Origin origin)
{
@ -31,8 +32,8 @@ public class HttpDestinationOverFCGI extends PoolingHttpDestination<HttpConnecti
}
@Override
protected void send(HttpConnectionOverFCGI connection, HttpExchange exchange)
protected void send(Connection connection, HttpExchange exchange)
{
connection.send(exchange);
((HttpConnectionOverFCGI)connection).send(exchange);
}
}

View File

@ -22,8 +22,9 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.MultiplexHttpDestination;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
public class MultiplexHttpDestinationOverFCGI extends MultiplexHttpDestination<HttpConnectionOverFCGI>
public class MultiplexHttpDestinationOverFCGI extends MultiplexHttpDestination
{
public MultiplexHttpDestinationOverFCGI(HttpClient client, Origin origin)
{
@ -31,8 +32,8 @@ public class MultiplexHttpDestinationOverFCGI extends MultiplexHttpDestination<H
}
@Override
protected void send(HttpConnectionOverFCGI connection, HttpExchange exchange)
protected void send(Connection connection, HttpExchange exchange)
{
connection.send(exchange);
((HttpConnectionOverFCGI)connection).send(exchange);
}
}

View File

@ -22,8 +22,9 @@ import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.MultiplexHttpDestination;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Connection;
public class HttpDestinationOverHTTP2 extends MultiplexHttpDestination<HttpConnectionOverHTTP2>
public class HttpDestinationOverHTTP2 extends MultiplexHttpDestination
{
public HttpDestinationOverHTTP2(HttpClient client, Origin origin)
{
@ -31,8 +32,8 @@ public class HttpDestinationOverHTTP2 extends MultiplexHttpDestination<HttpConne
}
@Override
protected void send(HttpConnectionOverHTTP2 connection, HttpExchange exchange)
protected void send(Connection connection, HttpExchange exchange)
{
connection.send(exchange);
((HttpConnectionOverHTTP2)connection).send(exchange);
}
}

View File

@ -61,6 +61,7 @@ import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.DuplexConnectionPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpContentResponse;
import org.eclipse.jetty.client.HttpProxy;
@ -1081,7 +1082,8 @@ public class ProxyServletTest
Assert.assertEquals(-1, input.read());
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination("http", "localhost", port);
Assert.assertEquals(0, destination.getConnectionPool().getIdleConnections().size());
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
}
@Test
@ -1154,7 +1156,8 @@ public class ProxyServletTest
}
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination("http", "localhost", port);
Assert.assertEquals(0, destination.getConnectionPool().getIdleConnections().size());
DuplexConnectionPool connectionPool = (DuplexConnectionPool)destination.getConnectionPool();
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
}
@Test

View File

@ -41,6 +41,7 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.SslConnectionFactory;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After;
@ -89,22 +90,28 @@ public abstract class AbstractTest
QueuedThreadPool serverThreads = new QueuedThreadPool();
serverThreads.setName("server");
server = new Server(serverThreads);
connector = new ServerConnector(server, provideServerConnectionFactory(transport));
connector = newServerConnector(server);
server.addConnector(connector);
server.setHandler(handler);
server.start();
}
protected ServerConnector newServerConnector(Server server)
{
return new ServerConnector(server, provideServerConnectionFactory(transport));
}
private void startClient() throws Exception
{
QueuedThreadPool clientThreads = new QueuedThreadPool();
clientThreads.setName("client");
client = new HttpClient(provideClientTransport(transport), sslContextFactory);
client.setExecutor(clientThreads);
client.setSocketAddressResolver(new SocketAddressResolver.Sync());
client.start();
}
private ConnectionFactory[] provideServerConnectionFactory(Transport transport)
protected ConnectionFactory[] provideServerConnectionFactory(Transport transport)
{
List<ConnectionFactory> result = new ArrayList<>();
switch (transport)
@ -154,7 +161,7 @@ public abstract class AbstractTest
return result.toArray(new ConnectionFactory[result.size()]);
}
private HttpClientTransport provideClientTransport(Transport transport)
protected HttpClientTransport provideClientTransport(Transport transport)
{
switch (transport)
{
@ -197,6 +204,22 @@ public abstract class AbstractTest
}
}
protected boolean isTransportSecure()
{
switch (transport)
{
case HTTP:
case H2C:
case FCGI:
return false;
case HTTPS:
case H2:
return true;
default:
throw new IllegalArgumentException();
}
}
@After
public void stop() throws Exception
{

View File

@ -16,12 +16,11 @@
// ========================================================================
//
package org.eclipse.jetty.client;
package org.eclipse.jetty.http.client;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Locale;
import java.util.Random;
@ -34,29 +33,33 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.ConnectionPool;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.client.HttpClientTransport;
import org.eclipse.jetty.client.HttpDestination;
import org.eclipse.jetty.client.LeakTrackingConnectionPool;
import org.eclipse.jetty.client.Origin;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.http.HttpClientTransportOverHTTP;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.client.http.HttpDestinationOverHTTP;
import org.eclipse.jetty.client.util.BytesContentProvider;
import org.eclipse.jetty.fcgi.client.http.HttpClientTransportOverFCGI;
import org.eclipse.jetty.fcgi.client.http.HttpDestinationOverFCGI;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.http.HttpScheme;
import org.eclipse.jetty.io.ArrayByteBufferPool;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.LeakTrackingByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.server.AbstractConnectionFactory;
import org.eclipse.jetty.server.HttpConnectionFactory;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.server.handler.AbstractHandler;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.LeakDetector;
import org.eclipse.jetty.util.SocketAddressResolver;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Scheduler;
import org.hamcrest.Matchers;
import org.junit.Assert;
@ -64,66 +67,99 @@ import org.junit.Test;
import static org.junit.Assert.assertThat;
public class HttpClientLoadTest extends AbstractHttpClientServerTest
public class HttpClientLoadTest extends AbstractTest
{
private final Logger logger = Log.getLogger(HttpClientLoadTest.class);
private final AtomicLong connectionLeaks = new AtomicLong();
public HttpClientLoadTest(SslContextFactory sslContextFactory)
public HttpClientLoadTest(Transport transport)
{
super(sslContextFactory);
super(transport);
}
@Test
public void testIterative() throws Exception
@Override
protected ServerConnector newServerConnector(Server server)
{
int cores = Runtime.getRuntime().availableProcessors();
ByteBufferPool byteBufferPool = new ArrayByteBufferPool();
byteBufferPool = new LeakTrackingByteBufferPool(byteBufferPool);
return new ServerConnector(server, null, null, byteBufferPool,
1, Math.min(1, cores / 2), provideServerConnectionFactory(transport));
}
final AtomicLong connectionLeaks = new AtomicLong();
start(new LoadHandler());
server.stop();
server.removeConnector(connector);
LeakTrackingByteBufferPool serverBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged());
connector = new ServerConnector(server, connector.getExecutor(), connector.getScheduler(),
serverBufferPool , 1, Math.min(1, cores / 2),
AbstractConnectionFactory.getFactories(sslContextFactory, new HttpConnectionFactory()));
server.addConnector(connector);
server.start();
client.stop();
HttpClient newClient = new HttpClient(new HttpClientTransportOverHTTP()
@Override
protected HttpClientTransport provideClientTransport(Transport transport)
{
switch (transport)
{
@Override
public HttpDestination newHttpDestination(Origin origin)
case HTTP:
case HTTPS:
{
return new HttpDestinationOverHTTP(getHttpClient(), origin)
return new HttpClientTransportOverHTTP(1)
{
@Override
protected DuplexConnectionPool newConnectionPool(HttpClient client)
public HttpDestination newHttpDestination(Origin origin)
{
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
return new HttpDestinationOverHTTP(getHttpClient(), origin)
{
@Override
protected void leaked(LeakDetector.LeakInfo resource)
protected ConnectionPool newConnectionPool(HttpClient client)
{
connectionLeaks.incrementAndGet();
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
{
@Override
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
super.leaked(leakInfo);
connectionLeaks.incrementAndGet();
}
};
}
};
}
};
}
}, sslContextFactory);
newClient.setExecutor(client.getExecutor());
newClient.setSocketAddressResolver(new SocketAddressResolver.Sync());
client = newClient;
LeakTrackingByteBufferPool clientBufferPool = new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged());
client.setByteBufferPool(clientBufferPool);
case FCGI:
{
return new HttpClientTransportOverFCGI(1, false, "")
{
@Override
public HttpDestination newHttpDestination(Origin origin)
{
return new HttpDestinationOverFCGI(getHttpClient(), origin)
{
@Override
protected ConnectionPool newConnectionPool(HttpClient client)
{
return new LeakTrackingConnectionPool(this, client.getMaxConnectionsPerDestination(), this)
{
@Override
protected void leaked(LeakDetector.LeakInfo leakInfo)
{
super.leaked(leakInfo);
connectionLeaks.incrementAndGet();
}
};
}
};
}
};
}
default:
{
return super.provideClientTransport(transport);
}
}
}
@Test
public void testIterative() throws Exception
{
start(new LoadHandler());
client.setByteBufferPool(new LeakTrackingByteBufferPool(new MappedByteBufferPool.Tagged()));
client.setMaxConnectionsPerDestination(32768);
client.setMaxRequestsQueuedPerDestination(1024 * 1024);
client.setDispatchIO(false);
client.setStrictEventOrdering(false);
client.start();
Random random = new Random();
// At least 25k requests to warmup properly (use -XX:+PrintCompilation to verify JIT activity)
@ -143,13 +179,23 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
System.gc();
assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), Matchers.is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), Matchers.is(0L));
ByteBufferPool byteBufferPool = connector.getByteBufferPool();
if (byteBufferPool instanceof LeakTrackingByteBufferPool)
{
LeakTrackingByteBufferPool serverBufferPool = (LeakTrackingByteBufferPool)byteBufferPool;
assertThat("Server BufferPool - leaked acquires", serverBufferPool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Server BufferPool - leaked releases", serverBufferPool.getLeakedReleases(), Matchers.is(0L));
assertThat("Server BufferPool - unreleased", serverBufferPool.getLeakedResources(), Matchers.is(0L));
}
assertThat("Client BufferPool - leaked acquires", clientBufferPool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Client BufferPool - leaked releases", clientBufferPool.getLeakedReleases(), Matchers.is(0L));
assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedResources(), Matchers.is(0L));
byteBufferPool = client.getByteBufferPool();
if (byteBufferPool instanceof LeakTrackingByteBufferPool)
{
LeakTrackingByteBufferPool clientBufferPool = (LeakTrackingByteBufferPool)byteBufferPool;
assertThat("Client BufferPool - leaked acquires", clientBufferPool.getLeakedAcquires(), Matchers.is(0L));
assertThat("Client BufferPool - leaked releases", clientBufferPool.getLeakedReleases(), Matchers.is(0L));
assertThat("Client BufferPool - unreleased", clientBufferPool.getLeakedResources(), Matchers.is(0L));
}
assertThat("Connection Leaks", connectionLeaks.get(), Matchers.is(0L));
}
@ -159,29 +205,15 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
CountDownLatch latch = new CountDownLatch(iterations);
List<String> failures = new ArrayList<>();
int factor = logger.isDebugEnabled() ? 25 : 1;
factor *= "http".equalsIgnoreCase(scheme) ? 10 : 1000;
int factor = (logger.isDebugEnabled() ? 25 : 1) * 100;
// Dumps the state of the client if the test takes too long
final Thread testThread = Thread.currentThread();
Scheduler.Task task = client.getScheduler().schedule(new Runnable()
Scheduler.Task task = client.getScheduler().schedule(() ->
{
@Override
public void run()
{
logger.warn("Interrupting test, it is taking too long");
for (String host : Arrays.asList("localhost", "127.0.0.1"))
{
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, host, connector.getLocalPort());
DuplexConnectionPool connectionPool = destination.getConnectionPool();
for (Connection connection : new ArrayList<>(connectionPool.getActiveConnections()))
{
HttpConnectionOverHTTP active = (HttpConnectionOverHTTP)connection;
logger.warn(active.getEndPoint() + " exchange " + active.getHttpChannel().getHttpExchange());
}
}
testThread.interrupt();
}
logger.warn("Interrupting test, it is taking too long");
logger.warn(client.dump());
testThread.interrupt();
}, iterations * factor, TimeUnit.MILLISECONDS);
long begin = System.nanoTime();
@ -209,7 +241,7 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
// Choose a random method
HttpMethod method = random.nextBoolean() ? HttpMethod.GET : HttpMethod.POST;
boolean ssl = HttpScheme.HTTPS.is(scheme);
boolean ssl = isTransportSecure();
// Choose randomly whether to close the connection on the client or on the server
boolean clientClose = false;
@ -222,7 +254,7 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
int maxContentLength = 64 * 1024;
int contentLength = random.nextInt(maxContentLength) + 1;
test(scheme, host, method.asString(), clientClose, serverClose, contentLength, true, latch, failures);
test(ssl ? "https" : "http", host, method.asString(), clientClose, serverClose, contentLength, true, latch, failures);
}
private void test(String scheme, String host, String method, boolean clientClose, boolean serverClose, int contentLength, final boolean checkContentLength, final CountDownLatch latch, final List<String> failures) throws InterruptedException
@ -298,6 +330,7 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
switch (method)
{
case "GET":
{
int contentLength = request.getIntHeader("X-Download");
if (contentLength > 0)
{
@ -305,10 +338,13 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
response.getOutputStream().write(new byte[contentLength]);
}
break;
}
case "POST":
{
response.setHeader("X-Content", request.getHeader("X-Upload"));
IO.copy(request.getInputStream(), response.getOutputStream());
break;
}
}
if (Boolean.parseBoolean(request.getHeader("X-Close")))