diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java index 0af5286126a..df91688258f 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ConnectionPool.java @@ -28,6 +28,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.util.BlockingArrayQueue; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.Dumpable; @@ -41,15 +42,15 @@ public class ConnectionPool implements Closeable, Dumpable private final AtomicInteger connectionCount = new AtomicInteger(); private final Destination destination; private final int maxConnections; - private final Promise connectionPromise; + private final Callback requester; private final BlockingDeque idleConnections; private final BlockingQueue activeConnections; - public ConnectionPool(Destination destination, int maxConnections, Promise connectionPromise) + public ConnectionPool(Destination destination, int maxConnections, Callback requester) { this.destination = destination; this.maxConnections = maxConnections; - this.connectionPromise = connectionPromise; + this.requester = requester; this.idleConnections = new LinkedBlockingDeque<>(maxConnections); this.activeConnections = new BlockingArrayQueue<>(maxConnections); } @@ -104,10 +105,8 @@ public class ConnectionPool implements Closeable, Dumpable { if (LOG.isDebugEnabled()) LOG.debug("Connection {}/{} creation succeeded {}", next, maxConnections, connection); - if (activate(connection)) - connectionPromise.succeeded(connection); - else - connectionPromise.failed(new IllegalStateException("Active connection overflow")); + idle(connection, true); + requester.succeeded(); } @Override @@ -116,7 +115,7 @@ public class ConnectionPool implements Closeable, Dumpable if (LOG.isDebugEnabled()) LOG.debug("Connection " + next + "/" + maxConnections + " creation failed", x); connectionCount.decrementAndGet(); - connectionPromise.failed(x); + requester.failed(x); } }); @@ -160,24 +159,30 @@ public class ConnectionPool implements Closeable, Dumpable { released(connection); if (activeConnections.remove(connection)) - { - // Make sure we use "hot" connections first - if (idleConnections.offerFirst(connection)) - { - if (LOG.isDebugEnabled()) - LOG.debug("Connection idle {}", connection); - return true; - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("Connection idle overflow {}", connection); - connection.close(); - } - } + return idle(connection, false); return false; } + protected boolean idle(Connection connection, boolean created) + { + // Make sure we use "hot" connections first. + boolean idle = created ? idleConnections.offerLast(connection) + : idleConnections.offerFirst(connection); + if (idle) + { + if (LOG.isDebugEnabled()) + LOG.debug("Connection idle {}", connection); + return true; + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Connection idle overflow {}", connection); + connection.close(); + return false; + } + } + protected void released(Connection connection) { } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index ad919e51614..aca7c85a540 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -205,7 +205,7 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl return queue.offer(exchange); } - protected abstract void send(); + public abstract void send(); public void newConnection(Promise promise) { diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java index a1b87344f85..b22eb90a833 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/LeakTrackingConnectionPool.java @@ -20,8 +20,8 @@ package org.eclipse.jetty.client; import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Destination; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.LeakDetector; -import org.eclipse.jetty.util.Promise; public class LeakTrackingConnectionPool extends ConnectionPool { @@ -34,9 +34,9 @@ public class LeakTrackingConnectionPool extends ConnectionPool } }; - public LeakTrackingConnectionPool(Destination destination, int maxConnections, Promise connectionPromise) + public LeakTrackingConnectionPool(Destination destination, int maxConnections, Callback requester) { - super(destination, maxConnections, connectionPromise); + super(destination, maxConnections, requester); start(); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java index 187fadcb15a..c73659ae0ee 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/MultiplexHttpDestination.java @@ -35,7 +35,7 @@ public abstract class MultiplexHttpDestination extends Htt } @Override - protected void send() + public void send() { while (true) { @@ -56,7 +56,7 @@ public abstract class MultiplexHttpDestination extends Htt } case CONNECTED: { - if (process(connection, false)) + if (process(connection)) break; return; } @@ -75,7 +75,7 @@ public abstract class MultiplexHttpDestination extends Htt C connection = this.connection = (C)result; if (connect.compareAndSet(ConnectState.CONNECTING, ConnectState.CONNECTED)) { - process(connection, true); + process(connection); } else { @@ -91,7 +91,7 @@ public abstract class MultiplexHttpDestination extends Htt abort(x); } - protected boolean process(final C connection, boolean dispatch) + protected boolean process(final C connection) { HttpClient client = getHttpClient(); final HttpExchange exchange = getHttpExchanges().poll(); @@ -113,21 +113,7 @@ public abstract class MultiplexHttpDestination extends Htt } else { - if (dispatch) - { - client.getExecutor().execute(new Runnable() - { - @Override - public void run() - { - send(connection, exchange); - } - }); - } - else - { - send(connection, exchange); - } + send(connection, exchange); } return true; } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java index f00e9475d4b..cc4a0e2bd1c 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/PoolingHttpDestination.java @@ -23,10 +23,10 @@ import java.util.Arrays; import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Request; -import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.component.ContainerLifeCycle; -public abstract class PoolingHttpDestination extends HttpDestination implements Promise +public abstract class PoolingHttpDestination extends HttpDestination implements Callback { private final ConnectionPool connectionPool; @@ -47,30 +47,23 @@ public abstract class PoolingHttpDestination extends HttpD } @Override - @SuppressWarnings("unchecked") - public void succeeded(Connection connection) + public void succeeded() { - process((C)connection, true); } @Override public void failed(final Throwable x) { - getHttpClient().getExecutor().execute(new Runnable() - { - @Override - public void run() - { - abort(x); - } - }); + abort(x); } - protected void send() + public void send() { + if (getHttpExchanges().isEmpty()) + return; C connection = acquire(); if (connection != null) - process(connection, false); + process(connection); } @SuppressWarnings("unchecked") @@ -87,9 +80,8 @@ public abstract class PoolingHttpDestination extends HttpD *

If a request is waiting to be executed, it will be dequeued and executed by the new connection.

* * @param connection the new connection - * @param dispatch whether to dispatch the processing to another thread */ - public void process(final C connection, boolean dispatch) + public void process(final C connection) { HttpClient client = getHttpClient(); final HttpExchange exchange = getHttpExchanges().poll(); @@ -122,21 +114,7 @@ public abstract class PoolingHttpDestination extends HttpD } else { - if (dispatch) - { - client.getExecutor().execute(new Runnable() - { - @Override - public void run() - { - send(connection, exchange); - } - }); - } - else - { - send(connection, exchange); - } + send(connection, exchange); } } } @@ -155,7 +133,7 @@ public abstract class PoolingHttpDestination extends HttpD { if (connectionPool.isActive(connection)) { - process(connection, false); + process(connection); } else { @@ -198,7 +176,7 @@ public abstract class PoolingHttpDestination extends HttpD // idle timeout, so no worries. C newConnection = acquire(); if (newConnection != null) - process(newConnection, false); + process(newConnection); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java index ecbf5aa8306..59e27b3675d 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java @@ -18,7 +18,6 @@ package org.eclipse.jetty.client.http; -import java.nio.ByteBuffer; import java.nio.channels.AsynchronousCloseException; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; @@ -76,6 +75,7 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec { super.onOpen(); fillInterested(); + getHttpDestination().send(); } public boolean isClosed() diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java index 4268d4d0661..d40178de47d 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpRequestAbortTest.java @@ -87,16 +87,11 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest Assert.assertFalse(begin.get()); } - // The request send triggered a connection creation - // that is not awaited before failing the exchange. - Thread.sleep(1000); - - // However, the connection has not been used, so it's a good one. HttpDestinationOverHTTP destination = (HttpDestinationOverHTTP)client.getDestination(scheme, "localhost", connector.getLocalPort()); ConnectionPool connectionPool = destination.getConnectionPool(); - Assert.assertEquals(1, connectionPool.getConnectionCount()); + Assert.assertEquals(0, connectionPool.getConnectionCount()); Assert.assertEquals(0, connectionPool.getActiveConnections().size()); - Assert.assertEquals(1, connectionPool.getIdleConnections().size()); + Assert.assertEquals(0, connectionPool.getIdleConnections().size()); } @Test diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java index a74bbc73bd0..c0747a593f7 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpDestinationOverHTTPTest.java @@ -94,12 +94,12 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest HttpDestinationOverHTTP destination = new HttpDestinationOverHTTP(client, new Origin("http", "localhost", connector.getLocalPort())) { @Override - public void process(HttpConnectionOverHTTP connection, boolean dispatch) + public void process(HttpConnectionOverHTTP connection) { try { latch.await(5, TimeUnit.SECONDS); - super.process(connection, dispatch); + super.process(connection); } catch (InterruptedException x) { @@ -142,7 +142,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest // Acquire the connection to make it active Assert.assertSame(connection1, destination.acquire()); - destination.process(connection1, false); + destination.process(connection1); destination.release(connection1); Connection connection2 = destination.acquire();