Removed extra dispatches from HTTP client code.

This commit is contained in:
Simone Bordet 2015-02-19 17:45:58 +01:00
parent d4dfc0762f
commit 71f6527cae
8 changed files with 55 additions and 91 deletions

View File

@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
@ -41,15 +42,15 @@ public class ConnectionPool implements Closeable, Dumpable
private final AtomicInteger connectionCount = new AtomicInteger();
private final Destination destination;
private final int maxConnections;
private final Promise<Connection> connectionPromise;
private final Callback requester;
private final BlockingDeque<Connection> idleConnections;
private final BlockingQueue<Connection> activeConnections;
public ConnectionPool(Destination destination, int maxConnections, Promise<Connection> connectionPromise)
public ConnectionPool(Destination destination, int maxConnections, Callback requester)
{
this.destination = destination;
this.maxConnections = maxConnections;
this.connectionPromise = connectionPromise;
this.requester = requester;
this.idleConnections = new LinkedBlockingDeque<>(maxConnections);
this.activeConnections = new BlockingArrayQueue<>(maxConnections);
}
@ -104,10 +105,8 @@ public class ConnectionPool implements Closeable, Dumpable
{
if (LOG.isDebugEnabled())
LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection);
if (activate(connection))
connectionPromise.succeeded(connection);
else
connectionPromise.failed(new IllegalStateException("Active connection overflow"));
idle(connection, true);
requester.succeeded();
}
@Override
@ -116,7 +115,7 @@ public class ConnectionPool implements Closeable, Dumpable
if (LOG.isDebugEnabled())
LOG.debug("Connection " + next + "/" + maxConnections + " creation failed", x);
connectionCount.decrementAndGet();
connectionPromise.failed(x);
requester.failed(x);
}
});
@ -160,24 +159,30 @@ public class ConnectionPool implements Closeable, Dumpable
{
released(connection);
if (activeConnections.remove(connection))
{
// Make sure we use "hot" connections first
if (idleConnections.offerFirst(connection))
{
if (LOG.isDebugEnabled())
LOG.debug("Connection idle {}", connection);
return true;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Connection idle overflow {}", connection);
connection.close();
}
}
return idle(connection, false);
return false;
}
protected boolean idle(Connection connection, boolean created)
{
// Make sure we use "hot" connections first.
boolean idle = created ? idleConnections.offerLast(connection)
: idleConnections.offerFirst(connection);
if (idle)
{
if (LOG.isDebugEnabled())
LOG.debug("Connection idle {}", connection);
return true;
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Connection idle overflow {}", connection);
connection.close();
return false;
}
}
protected void released(Connection connection)
{
}

View File

@ -205,7 +205,7 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
return queue.offer(exchange);
}
protected abstract void send();
public abstract void send();
public void newConnection(Promise<Connection> promise)
{

View File

@ -20,8 +20,8 @@ package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.LeakDetector;
import org.eclipse.jetty.util.Promise;
public class LeakTrackingConnectionPool extends ConnectionPool
{
@ -34,9 +34,9 @@ public class LeakTrackingConnectionPool extends ConnectionPool
}
};
public LeakTrackingConnectionPool(Destination destination, int maxConnections, Promise<Connection> connectionPromise)
public LeakTrackingConnectionPool(Destination destination, int maxConnections, Callback requester)
{
super(destination, maxConnections, connectionPromise);
super(destination, maxConnections, requester);
start();
}

View File

@ -35,7 +35,7 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
}
@Override
protected void send()
public void send()
{
while (true)
{
@ -56,7 +56,7 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
}
case CONNECTED:
{
if (process(connection, false))
if (process(connection))
break;
return;
}
@ -75,7 +75,7 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
C connection = this.connection = (C)result;
if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED))
{
process(connection, true);
process(connection);
}
else
{
@ -91,7 +91,7 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
abort(x);
}
protected boolean process(final C connection, boolean dispatch)
protected boolean process(final C connection)
{
HttpClient client = getHttpClient();
final HttpExchange exchange = getHttpExchanges().poll();
@ -113,21 +113,7 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
}
else
{
if (dispatch)
{
client.getExecutor().execute(new Runnable()
{
@Override
public void run()
{
send(connection, exchange);
}
});
}
else
{
send(connection, exchange);
}
send(connection, exchange);
}
return true;
}

View File

@ -23,10 +23,10 @@ import java.util.Arrays;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.util.Promise;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
public abstract class PoolingHttpDestination<C extends Connection> extends HttpDestination implements Promise<Connection>
public abstract class PoolingHttpDestination<C extends Connection> extends HttpDestination implements Callback
{
private final ConnectionPool connectionPool;
@ -47,30 +47,23 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
}
@Override
@SuppressWarnings("unchecked")
public void succeeded(Connection connection)
public void succeeded()
{
process((C)connection, true);
}
@Override
public void failed(final Throwable x)
{
getHttpClient().getExecutor().execute(new Runnable()
{
@Override
public void run()
{
abort(x);
}
});
abort(x);
}
protected void send()
public void send()
{
if (getHttpExchanges().isEmpty())
return;
C connection = acquire();
if (connection != null)
process(connection, false);
process(connection);
}
@SuppressWarnings("unchecked")
@ -87,9 +80,8 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
* <p>If a request is waiting to be executed, it will be dequeued and executed by the new connection.</p>
*
* @param connection the new connection
* @param dispatch whether to dispatch the processing to another thread
*/
public void process(final C connection, boolean dispatch)
public void process(final C connection)
{
HttpClient client = getHttpClient();
final HttpExchange exchange = getHttpExchanges().poll();
@ -122,21 +114,7 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
}
else
{
if (dispatch)
{
client.getExecutor().execute(new Runnable()
{
@Override
public void run()
{
send(connection, exchange);
}
});
}
else
{
send(connection, exchange);
}
send(connection, exchange);
}
}
}
@ -155,7 +133,7 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
{
if (connectionPool.isActive(connection))
{
process(connection, false);
process(connection);
}
else
{
@ -198,7 +176,7 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
// idle timeout, so no worries.
C newConnection = acquire();
if (newConnection != null)
process(newConnection, false);
process(newConnection);
}
}

View File

@ -18,7 +18,6 @@
package org.eclipse.jetty.client.http;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
@ -76,6 +75,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
{
super.onOpen();
fillInterested();
getHttpDestination().send();
}
public boolean isClosed()

View File

@ -87,16 +87,11 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
Assert.assertFalse(begin.get());
}
// The request send triggered a connection creation
// that is not awaited before failing the exchange.
Thread.sleep(1000);
// However, the connection has not been used, so it's a good one.
HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort());
ConnectionPool connectionPool = destination.getConnectionPool();
Assert.assertEquals(1, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getConnectionCount());
Assert.assertEquals(0, connectionPool.getActiveConnections().size());
Assert.assertEquals(1, connectionPool.getIdleConnections().size());
Assert.assertEquals(0, connectionPool.getIdleConnections().size());
}
@Test

View File

@ -94,12 +94,12 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort()))
{
@Override
public void process(HttpConnectionOverHTTP connection, boolean dispatch)
public void process(HttpConnectionOverHTTP connection)
{
try
{
latch.await(5, TimeUnit.SECONDS);
super.process(connection, dispatch);
super.process(connection);
}
catch (InterruptedException x)
{
@ -142,7 +142,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
// Acquire the connection to make it active
Assert.assertSame(connection1, destination.acquire());
destination.process(connection1, false);
destination.process(connection1);
destination.release(connection1);
Connection connection2 = destination.acquire();