diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java index 1502d42940f..402f1afaaac 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java @@ -214,6 +214,11 @@ public class HttpClient extends AggregateLifeCycle return newRequest(URI.create(address("http", host, port))); } + public Request newRequest(String uri) + { + return newRequest(URI.create(uri)); + } + public Request newRequest(URI uri) { return new HttpRequest(this, uri); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java index ce75317d212..1c6551c3f51 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java @@ -74,13 +74,13 @@ public class HttpConnection extends AbstractConnection implements Connection @Override protected boolean onReadTimeout() { + LOG.debug("{} idle timeout", this); + HttpExchange exchange = this.exchange.get(); if (exchange != null) idleTimeout(); - - // We will be closing the connection, so remove it - LOG.debug("Connection {} idle timeout", this); - destination.remove(this); + else + destination.remove(this); return true; } @@ -218,9 +218,12 @@ public class HttpConnection extends AbstractConnection implements Connection } else { - destination.remove(this); - close(); - throw new IllegalStateException(); + // It is possible that the exchange has already been disassociated, + // for example if the connection idle timeouts: this will fail + // the response, but the request may still be under processing. + // Eventually the request will also fail as the connection is closed + // and will arrive here without an exchange being present. + // We just ignore this fact, as the exchange has already been processed } } @@ -228,7 +231,7 @@ public class HttpConnection extends AbstractConnection implements Connection public void close() { super.close(); - LOG.debug("Closed {}", this); + LOG.debug("{} closed", this); } @Override diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index 9ad21fc4dce..6dc59e16ff1 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -193,26 +193,27 @@ public class HttpDestination implements Destination, AutoCloseable if (next > maxConnections) { - LOG.debug("Max connections reached {}: {}", this, current); + LOG.debug("Max connections {} reached for {}", current, this); // Try again the idle connections return idleConnections.poll(); } if (connectionCount.compareAndSet(current, next)) { + LOG.debug("Creating connection {}/{} for {}", next, maxConnections, this); newConnection(new Callback() { @Override public void completed(Connection connection) { - LOG.debug("Created connection {}/{} {} for {}", next, maxConnections, connection, this); + LOG.debug("Created connection {}/{} {} for {}", next, maxConnections, connection, HttpDestination.this); process(connection); } @Override public void failed(Connection connection, final Throwable x) { - LOG.debug("Connection failed {} for {}", x, this); + LOG.debug("Connection failed {} for {}", x, HttpDestination.this); connectionCount.decrementAndGet(); client.getExecutor().execute(new Runnable() { @@ -249,12 +250,18 @@ public class HttpDestination implements Destination, AutoCloseable final RequestPair requestPair = requests.poll(); if (requestPair == null) { - LOG.debug("Connection {} idle", connection); + LOG.debug("{} idle", connection); idleConnections.offer(connection); + if (!client.isRunning()) + { + LOG.debug("{} is stopping", client); + remove(connection); + connection.close(); + } } else { - LOG.debug("Connection {} active", connection); + LOG.debug("{} active", connection); activeConnections.offer(connection); client.getExecutor().execute(new Runnable() { @@ -269,31 +276,36 @@ public class HttpDestination implements Destination, AutoCloseable public void release(Connection connection) { - LOG.debug("Connection {} released", connection); + LOG.debug("{} released", connection); if (client.isRunning()) { activeConnections.remove(connection); - idleConnections.offer(connection); - if (!client.isRunning()) - { - LOG.debug("{} is stopping", client); - idleConnections.remove(connection); - connection.close(); - } + process(connection); } else { LOG.debug("{} is stopped", client); + remove(connection); connection.close(); } } public void remove(Connection connection) { - LOG.debug("Connection {} removed", connection); + LOG.debug("{} removed", connection); connectionCount.decrementAndGet(); activeConnections.remove(connection); idleConnections.remove(connection); + + // We need to executed queued requests even if this connection failed. + // We may create a connection that is not needed, but it will eventually + // idle timeout, so no worries + if (!requests.isEmpty()) + { + connection = acquire(); + if (connection != null) + process(connection); + } } public void close() diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpResponseException.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpResponseException.java index 1539090f300..4aceda1de29 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpResponseException.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpResponseException.java @@ -23,24 +23,4 @@ public class HttpResponseException extends RuntimeException public HttpResponseException() { } - - public HttpResponseException(String message) - { - super(message); - } - - public HttpResponseException(String message, Throwable cause) - { - super(message, cause); - } - - public HttpResponseException(Throwable cause) - { - super(cause); - } - - public HttpResponseException(String message, Throwable cause, boolean enableSuppression, boolean writableStackTrace) - { - super(message, cause, enableSuppression, writableStackTrace); - } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java index da5e4f4478b..e5091802579 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.client; import java.io.IOException; import java.net.URLEncoder; import java.util.List; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import javax.servlet.ServletException; import javax.servlet.ServletOutputStream; @@ -33,6 +34,7 @@ import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.http.HttpCookie; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.junit.Assert; import org.junit.Test; @@ -51,7 +53,14 @@ public class HttpClientTest extends AbstractHttpClientServerTest Assert.assertEquals(200, response.status()); HttpDestination destination = (HttpDestination)client.getDestination(scheme, host, port); - HttpConnection connection = (HttpConnection)destination.getIdleConnections().peek(); + + long start = System.nanoTime(); + HttpConnection connection = null; + while (connection == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5) + { + connection = (HttpConnection)destination.getIdleConnections().peek(); + TimeUnit.MILLISECONDS.sleep(10); + } Assert.assertNotNull(connection); client.getCookieStore().addCookie(destination, new HttpCookie("foo", "bar", null, path)); @@ -86,7 +95,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest } @Test - public void test_GET_ResponseWithContent() throws Exception + public void test_GET_ResponseWithoutContent() throws Exception { start(new EmptyHandler()); @@ -97,7 +106,7 @@ public class HttpClientTest extends AbstractHttpClientServerTest } @Test - public void test_GET_ResponseWithoutContent() throws Exception + public void test_GET_ResponseWithContent() throws Exception { final byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7}; start(new AbstractHandler() @@ -185,4 +194,117 @@ public class HttpClientTest extends AbstractHttpClientServerTest String content = new String(response.content(), "UTF-8"); Assert.assertEquals(value11 + value12 + value2, content); } + + @Test + public void test_QueuedRequest_IsSent_WhenPreviousRequestSucceeded() throws Exception + { + start(new EmptyHandler()); + + client.setMaxConnectionsPerAddress(1); + + final CountDownLatch latch = new CountDownLatch(1); + final CountDownLatch successLatch = new CountDownLatch(2); + client.newRequest("http://localhost:" + connector.getLocalPort()) + .listener(new org.eclipse.jetty.client.api.Request.Listener.Adapter() + { + @Override + public void onBegin(org.eclipse.jetty.client.api.Request request) + { + try + { + latch.await(); + } + catch (InterruptedException x) + { + x.printStackTrace(); + } + } + }) + .send(new Response.Listener.Adapter() + { + @Override + public void onSuccess(Response response) + { + Assert.assertEquals(200, response.status()); + successLatch.countDown(); + } + }); + + client.newRequest("http://localhost:" + connector.getLocalPort()) + .listener(new org.eclipse.jetty.client.api.Request.Listener.Adapter() + { + @Override + public void onQueued(org.eclipse.jetty.client.api.Request request) + { + latch.countDown(); + } + }) + .send(new Response.Listener.Adapter() + { + @Override + public void onSuccess(Response response) + { + Assert.assertEquals(200, response.status()); + successLatch.countDown(); + } + }); + + Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS)); + } + + @Slow + @Test + public void test_QueuedRequest_IsSent_WhenPreviousRequestClosedConnection() throws Exception + { + start(new EmptyHandler()); + + client.setMaxConnectionsPerAddress(1); + final long idleTimeout = 1000; + client.setIdleTimeout(idleTimeout); + + final CountDownLatch latch = new CountDownLatch(3); + client.newRequest("http://localhost:" + connector.getLocalPort()) + .listener(new org.eclipse.jetty.client.api.Request.Listener.Adapter() + { + @Override + public void onBegin(org.eclipse.jetty.client.api.Request request) + { + try + { + TimeUnit.MILLISECONDS.sleep(2 * idleTimeout); + } + catch (InterruptedException x) + { + x.printStackTrace(); + } + } + + @Override + public void onFailure(org.eclipse.jetty.client.api.Request request, Throwable failure) + { + latch.countDown(); + } + }) + .send(new Response.Listener.Adapter() + { + @Override + public void onFailure(Response response, Throwable failure) + { + latch.countDown(); + } + }); + + client.newRequest("http://localhost:" + connector.getLocalPort()) + .send(new Response.Listener.Adapter() + { + @Override + public void onSuccess(Response response) + { + Assert.assertEquals(200, response.status()); + latch.countDown(); + } + }); + + Assert.assertTrue(latch.await(5 * idleTimeout, TimeUnit.MILLISECONDS)); + } }