diff --git a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpClient.java b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpClient.java index 199b1f81a1a..9cf510b627e 100644 --- a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpClient.java +++ b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpClient.java @@ -36,8 +36,6 @@ import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.http.HttpMethod; -import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.MappedByteBufferPool; @@ -147,7 +145,9 @@ public class HttpClient extends AggregateLifeCycle @Override protected void doStop() throws Exception { + LOG.debug("Stopping {}", this); super.doStop(); + LOG.info("Stopped {}", this); } public long getIdleTimeout() @@ -195,26 +195,12 @@ public class HttpClient extends AggregateLifeCycle public Request newRequest(URI uri) { - HttpRequest request = new HttpRequest(this, uri); - normalizeRequest(request); - return request; + return new HttpRequest(this, uri); } protected Request newRequest(long id, URI uri) { - HttpRequest request = new HttpRequest(this, id, uri); - normalizeRequest(request); - return request; - } - - protected void normalizeRequest(Request request) - { - // TODO: Add decoder, cookies, agent, default headers, etc. - request.method(HttpMethod.GET) - .version(HttpVersion.HTTP_1_1) - .agent(getUserAgent()) - .idleTimeout(getIdleTimeout()) - .followRedirects(isFollowRedirects()); + return new HttpRequest(this, id, uri); } private String address(String scheme, String host, int port) @@ -322,7 +308,7 @@ public class HttpClient extends AggregateLifeCycle this.responseBufferSize = responseBufferSize; } - protected void newConnection(Destination destination, Callback callback) + protected void newConnection(HttpDestination destination, Callback callback) { SocketChannel channel = null; try @@ -407,7 +393,7 @@ public class HttpClient extends AggregateLifeCycle public org.eclipse.jetty.io.Connection newConnection(SocketChannel channel, EndPoint endPoint, Object attachment) throws IOException { ConnectionCallback callback = (ConnectionCallback)attachment; - Destination destination = callback.destination; + HttpDestination destination = callback.destination; SslContextFactory sslContextFactory = getSslContextFactory(); if ("https".equals(destination.scheme())) @@ -426,7 +412,7 @@ public class HttpClient extends AggregateLifeCycle SslConnection sslConnection = new SslConnection(getByteBufferPool(), getExecutor(), endPoint, engine); EndPoint appEndPoint = sslConnection.getDecryptedEndPoint(); - HttpConnection connection = new HttpConnection(HttpClient.this, appEndPoint); + HttpConnection connection = new HttpConnection(HttpClient.this, appEndPoint, destination); appEndPoint.setConnection(connection); callback.callback.completed(connection); connection.onOpen(); @@ -436,7 +422,7 @@ public class HttpClient extends AggregateLifeCycle } else { - HttpConnection connection = new HttpConnection(HttpClient.this, endPoint); + HttpConnection connection = new HttpConnection(HttpClient.this, endPoint, destination); callback.callback.completed(connection); return connection; } @@ -451,10 +437,10 @@ public class HttpClient extends AggregateLifeCycle private class ConnectionCallback extends FutureCallback { - private final Destination destination; + private final HttpDestination destination; private final Callback callback; - private ConnectionCallback(Destination destination, Callback callback) + private ConnectionCallback(HttpDestination destination, Callback callback) { this.destination = destination; this.callback = callback; diff --git a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpConnection.java index d29e30c01bd..e7ddc743b86 100644 --- a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpConnection.java +++ b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpConnection.java @@ -8,6 +8,7 @@ import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.EndPoint; @@ -20,13 +21,15 @@ public class HttpConnection extends AbstractConnection implements Connection private final AtomicReference exchange = new AtomicReference<>(); private final HttpClient client; + private final HttpDestination destination; private final HttpSender sender; private final HttpReceiver receiver; - public HttpConnection(HttpClient client, EndPoint endPoint) + public HttpConnection(HttpClient client, EndPoint endPoint, HttpDestination destination) { super(endPoint, client.getExecutor()); this.client = client; + this.destination = destination; this.sender = new HttpSender(this); this.receiver = new HttpReceiver(this); } @@ -49,6 +52,10 @@ public class HttpConnection extends AbstractConnection implements Connection HttpExchange exchange = this.exchange.get(); if (exchange != null) exchange.idleTimeout(); + + // We will be closing the connection, so remove it + destination.remove(this); + return true; } @@ -72,6 +79,21 @@ public class HttpConnection extends AbstractConnection implements Connection private void normalizeRequest(Request request) { + if (request.method() == null) + request.method(HttpMethod.GET); + + if (request.version() == null) + request.version(HttpVersion.HTTP_1_1); + + if (request.agent() == null) + request.agent(client.getUserAgent()); + + if (request.idleTimeout() <= 0) + request.idleTimeout(client.getIdleTimeout()); + + // TODO: follow redirects + // TODO: cookies + HttpVersion version = request.version(); HttpFields headers = request.headers(); ContentProvider content = request.content(); @@ -103,7 +125,13 @@ public class HttpConnection extends AbstractConnection implements Connection if (version.getVersion() > 10) { if (!headers.containsKey(HttpHeader.HOST.asString())) - headers.put(HttpHeader.HOST, request.host() + ":" + request.port()); + { + String value = request.host(); + int port = request.port(); + if (port > 0) + value += ":" + port; + headers.put(HttpHeader.HOST, value); + } } } @@ -116,4 +144,14 @@ public class HttpConnection extends AbstractConnection implements Connection else throw new IllegalStateException(); } + + @Override + public String toString() + { + return String.format("%s@%x(l:%s <-> r:%s)", + HttpConnection.class.getSimpleName(), + hashCode(), + getEndPoint().getLocalAddress(), + getEndPoint().getRemoteAddress()); + } } diff --git a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpDestination.java index 43491b00988..b37ea88404f 100644 --- a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -15,6 +15,7 @@ package org.eclipse.jetty.client; import java.util.Queue; import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; import java.util.concurrent.Future; import java.util.concurrent.RejectedExecutionException; import java.util.concurrent.atomic.AtomicInteger; @@ -38,8 +39,8 @@ public class HttpDestination implements Destination private final String host; private final int port; private final Queue requests; - private final Queue idleConnections; - private final Queue activeConnections; + private final BlockingQueue idleConnections; + private final BlockingQueue activeConnections; public HttpDestination(HttpClient client, String scheme, String host, int port) { @@ -52,6 +53,11 @@ public class HttpDestination implements Destination this.activeConnections = new ArrayBlockingQueue<>(client.getMaxConnectionsPerAddress()); } + protected BlockingQueue idleConnections() + { + return idleConnections; + } + @Override public String scheme() { @@ -93,7 +99,9 @@ public class HttpDestination implements Destination { LOG.debug("Queued {}", request); notifyRequestQueued(request.listener(), request); - ensureConnection(); // TODO: improve and test this + Connection connection = acquire(); + if (connection != null) + process(connection); } } else @@ -120,8 +128,24 @@ public class HttpDestination implements Destination } } - private void ensureConnection() + public Future newConnection() { + FutureCallback result = new FutureCallback<>(); + newConnection(result); + return result; + } + + protected void newConnection(Callback callback) + { + client.newConnection(this, callback); + } + + protected Connection acquire() + { + Connection result = idleConnections.poll(); + if (result != null) + return result; + final int maxConnections = client.getMaxConnectionsPerAddress(); while (true) { @@ -131,7 +155,8 @@ public class HttpDestination implements Destination if (next > maxConnections) { LOG.debug("Max connections reached {}: {}", this, current); - break; + // Try again the idle connections + return idleConnections.poll(); } if (connectionCount.compareAndSet(current, next)) @@ -141,7 +166,7 @@ public class HttpDestination implements Destination @Override public void completed(Connection connection) { - LOG.debug("Created connection {}/{} for {}", next, maxConnections, this); + LOG.debug("Created connection {}/{} {} for {}", next, maxConnections, connection, this); process(connection); } @@ -151,23 +176,12 @@ public class HttpDestination implements Destination // TODO: what here ? } }); - break; + // Try again the idle connections + return idleConnections.poll(); } } } - public Future newConnection() - { - FutureCallback result = new FutureCallback<>(); - newConnection(result); - return result; - } - - private void newConnection(Callback callback) - { - client.newConnection(this, callback); - } - /** *

Processes a new connection making it idle or active depending on whether requests are waiting to be sent.

*

A new connection is created when a request needs to be executed; it is possible that the request that @@ -182,6 +196,7 @@ public class HttpDestination implements Destination final RequestPair requestPair = requests.poll(); if (requestPair == null) { + LOG.debug("Connection {} idle", connection); idleConnections.offer(connection); } else @@ -198,20 +213,18 @@ public class HttpDestination implements Destination } } - // TODO: 1. We must do queuing of requests in any case, because we cannot do blocking connect - // TODO: 2. We must be non-blocking connect, therefore we need to queue - - // Connections should compete for the queue of requests in separate threads - // that poses a problem of thread pool size: if < maxConnections we're starving - // - // conn1 is executed, takes on the queue => I need at least one thread per destination - - // we need to queue the request, pick an idle connection, then execute { conn.send(request, listener) } - - // if I create manually the connection, then I call send(request, listener) - - // Other ways ? + public void release(Connection connection) + { + activeConnections.remove(connection); + idleConnections.offer(connection); + } + public void remove(Connection connection) + { + connectionCount.decrementAndGet(); + activeConnections.remove(connection); + idleConnections.remove(connection); + } @Override public String toString() diff --git a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpExchange.java b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpExchange.java index a7a64cf77b6..4b475e4fead 100644 --- a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpExchange.java +++ b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpExchange.java @@ -57,7 +57,7 @@ public class HttpExchange receiver.receive(this); } - public void requestDone() + public void requestDone(boolean success) { // TODO } diff --git a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpRequest.java b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpRequest.java index 23dd6cc33db..b1919a96fc5 100644 --- a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpRequest.java +++ b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpRequest.java @@ -24,6 +24,7 @@ import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.util.FutureCallback; @@ -147,13 +148,13 @@ public class HttpRequest implements Request @Override public String agent() { - return agent; + return headers.get(HttpHeader.USER_AGENT); } @Override public Request agent(String userAgent) { - this.agent = userAgent; + headers.put(HttpHeader.USER_AGENT, userAgent); return this; } diff --git a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpSender.java index 7edd1cdfb07..6729c73a12a 100644 --- a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -30,7 +30,8 @@ public class HttpSender private Iterator contentChunks; private ByteBuffer header; private ByteBuffer chunk; - private boolean requestHeadersComplete; + private boolean headersComplete; + private boolean failed; public HttpSender(HttpConnection connection) { @@ -112,9 +113,9 @@ public class HttpSender if (callback.completed()) { - if (!requestHeadersComplete) + if (!headersComplete) { - requestHeadersComplete = true; + headersComplete = true; notifyRequestHeadersComplete(request); } releaseBuffers(); @@ -134,7 +135,7 @@ public class HttpSender } case DONE: { - if (generator.isEnd()) + if (generator.isEnd() && !failed) success(); return; } @@ -158,10 +159,15 @@ public class HttpSender protected void success() { - HttpExchange exchange = this.exchange.getAndSet(null); - exchange.requestDone(); + // Cleanup first generator.reset(); - requestHeadersComplete = false; + headersComplete = false; + + // Notify after + HttpExchange exchange = this.exchange.getAndSet(null); + LOG.debug("{} succeeded", exchange.request()); + exchange.requestDone(true); + // It is important to notify *after* we reset because // the notification may trigger another request/response notifyRequestSuccess(exchange.request()); @@ -169,13 +175,18 @@ public class HttpSender protected void fail(Throwable failure) { + // Cleanup first BufferUtil.clear(header); BufferUtil.clear(chunk); releaseBuffers(); connection.getEndPoint().shutdownOutput(); generator.abort(); + failed = true; + + // Notify after HttpExchange exchange = this.exchange.getAndSet(null); - exchange.requestDone(); + exchange.requestDone(false); + notifyRequestFailure(exchange.request(), failure); notifyResponseFailure(exchange.listener(), failure); } diff --git a/jetty-client-new/src/test/java/org/eclipse/jetty/client/AbstractHttpClientTest.java b/jetty-client-new/src/test/java/org/eclipse/jetty/client/AbstractHttpClientServerTest.java similarity index 95% rename from jetty-client-new/src/test/java/org/eclipse/jetty/client/AbstractHttpClientTest.java rename to jetty-client-new/src/test/java/org/eclipse/jetty/client/AbstractHttpClientServerTest.java index d5e2fceb72c..9ab203222cf 100644 --- a/jetty-client-new/src/test/java/org/eclipse/jetty/client/AbstractHttpClientTest.java +++ b/jetty-client-new/src/test/java/org/eclipse/jetty/client/AbstractHttpClientServerTest.java @@ -6,7 +6,7 @@ import org.eclipse.jetty.server.SelectChannelConnector; import org.eclipse.jetty.server.Server; import org.junit.After; -public class AbstractHttpClientTest +public class AbstractHttpClientServerTest { protected Server server; protected HttpClient client; diff --git a/jetty-client-new/src/test/java/org/eclipse/jetty/client/EmptyHandler.java b/jetty-client-new/src/test/java/org/eclipse/jetty/client/EmptyHandler.java new file mode 100644 index 00000000000..6717a17d8aa --- /dev/null +++ b/jetty-client-new/src/test/java/org/eclipse/jetty/client/EmptyHandler.java @@ -0,0 +1,18 @@ +package org.eclipse.jetty.client; + +import java.io.IOException; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.handler.AbstractHandler; + +public class EmptyHandler extends AbstractHandler +{ + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + baseRequest.setHandled(true); + } +} diff --git a/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpClientTest.java b/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpClientTest.java index 5af56922d51..b56fa03aa2f 100644 --- a/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpClientTest.java +++ b/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpClientTest.java @@ -28,19 +28,12 @@ import org.eclipse.jetty.server.handler.AbstractHandler; import org.junit.Assert; import org.junit.Test; -public class HttpClientTest extends AbstractHttpClientTest +public class HttpClientTest extends AbstractHttpClientServerTest { @Test public void test_GET_NoResponseContent() throws Exception { - start(new AbstractHandler() - { - @Override - public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException - { - baseRequest.setHandled(true); - } - }); + start(new EmptyHandler()); Response response = client.GET("http://localhost:" + connector.getLocalPort()).get(5, TimeUnit.SECONDS); @@ -70,7 +63,12 @@ public class HttpClientTest extends AbstractHttpClientTest Assert.assertArrayEquals(data, content); } + @Test + public void test_DestinationCount() throws Exception { + start(new EmptyHandler()); + + client.GET("http://localhost:" + connector.getLocalPort()).get(5, TimeUnit.SECONDS); List destinations = client.getDestinations(); Assert.assertNotNull(destinations); diff --git a/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpDestinationTest.java b/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpDestinationTest.java new file mode 100644 index 00000000000..a243a49ed98 --- /dev/null +++ b/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpDestinationTest.java @@ -0,0 +1,143 @@ +package org.eclipse.jetty.client; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.client.api.Connection; +import org.eclipse.jetty.toolchain.test.annotation.Slow; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +public class HttpDestinationTest extends AbstractHttpClientServerTest +{ + @Before + public void init() throws Exception + { + start(new EmptyHandler()); + } + + @Test + public void test_FirstAcquire_WithEmptyQueue() throws Exception + { + HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort()); + Connection connection = destination.acquire(); + + // There are no available existing connections, so acquire() returns null + Assert.assertNull(connection); + + // There are no queued requests, so the newly created connection will be idle + connection = destination.idleConnections().poll(5, TimeUnit.SECONDS); + Assert.assertNotNull(connection); + } + + @Test + public void test_SecondAcquire_AfterFirstAcquire_WithEmptyQueue_ReturnsSameConnection() throws Exception + { + HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort()); + Connection connection1 = destination.acquire(); + + // There are no available existing connections, so acquire() returns null + Assert.assertNull(connection1); + + // There are no queued requests, so the newly created connection will be idle + long start = System.nanoTime(); + while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5) + { + connection1 = destination.idleConnections().peek(); + TimeUnit.MILLISECONDS.sleep(50); + } + Assert.assertNotNull(connection1); + + Connection connection2 = destination.acquire(); + Assert.assertSame(connection1, connection2); + } + + @Test + public void test_SecondAcquire_ConcurrentWithFirstAcquire_WithEmptyQueue_CreatesTwoConnections() throws Exception + { + final CountDownLatch latch = new CountDownLatch(1); + HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort()) + { + @Override + protected void process(Connection connection) + { + try + { + latch.await(5, TimeUnit.SECONDS); + super.process(connection); + } + catch (InterruptedException x) + { + x.printStackTrace(); + } + } + }; + Connection connection1 = destination.acquire(); + + // There are no available existing connections, so acquire() returns null + Assert.assertNull(connection1); + + Connection connection2 = destination.acquire(); + Assert.assertNull(connection2); + + latch.countDown(); + + // There must be 2 idle connections + Connection connection = destination.idleConnections().poll(5, TimeUnit.SECONDS); + Assert.assertNotNull(connection); + connection = destination.idleConnections().poll(5, TimeUnit.SECONDS); + Assert.assertNotNull(connection); + } + + @Test + public void test_Acquire_Release_Acquire_ReturnsSameConnection() throws Exception + { + HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort()); + Connection connection1 = destination.acquire(); + + // There are no available existing connections, so acquire() returns null + Assert.assertNull(connection1); + + // There are no queued requests, so the newly created connection will be idle + long start = System.nanoTime(); + while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5) + { + connection1 = destination.idleConnections().peek(); + TimeUnit.MILLISECONDS.sleep(50); + } + Assert.assertNotNull(connection1); + + destination.release(connection1); + + Connection connection2 = destination.acquire(); + Assert.assertSame(connection1, connection2); + } + + @Slow + @Test + public void test_IdleConnection_IdleTimeout() throws Exception + { + long idleTimeout = 1000; + client.setIdleTimeout(idleTimeout); + + HttpDestination destination = new HttpDestination(client, "http", "localhost", connector.getLocalPort()); + destination.acquire(); + + // There are no queued requests, so the newly created connection will be idle + Connection connection1 = null; + long start = System.nanoTime(); + while (connection1 == null && TimeUnit.NANOSECONDS.toSeconds(System.nanoTime() - start) < 5) + { + connection1 = destination.idleConnections().peek(); + TimeUnit.MILLISECONDS.sleep(50); + } + Assert.assertNotNull(connection1); + + TimeUnit.MILLISECONDS.sleep(2 * idleTimeout); + + connection1 = destination.idleConnections().poll(); + Assert.assertNull(connection1); + } + +} diff --git a/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpReceiverTest.java b/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpReceiverTest.java index ef81041b31f..996a7d4142a 100644 --- a/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpReceiverTest.java +++ b/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpReceiverTest.java @@ -28,7 +28,9 @@ public class HttpReceiverTest // "\r\n"); // final AtomicReference responseRef = new AtomicReference<>(); // final CountDownLatch latch = new CountDownLatch(1); -// HttpReceiver receiver = new HttpReceiver(connection, null, new Response.Listener.Adapter() +// HttpReceiver receiver = new HttpReceiver(connection); +// HttpExchange exchange = new HttpExchange(); +// , null, new Response.Listener.Adapter() // { // @Override // public void onSuccess(Response response) diff --git a/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpSenderTest.java b/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpSenderTest.java index dda7353570f..a79ba4bee16 100644 --- a/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpSenderTest.java +++ b/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpSenderTest.java @@ -8,6 +8,7 @@ import java.util.concurrent.TimeUnit; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.io.ByteArrayEndPoint; +import org.eclipse.jetty.toolchain.test.annotation.Slow; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -34,11 +35,11 @@ public class HttpSenderTest public void test_Send_NoRequestContent() throws Exception { ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); - HttpConnection connection = new HttpConnection(client, endPoint); - Request httpRequest = new HttpRequest(client, URI.create("http://localhost/")); + HttpConnection connection = new HttpConnection(client, endPoint, null); + Request request = client.newRequest(URI.create("http://localhost/")); final CountDownLatch headersLatch = new CountDownLatch(1); final CountDownLatch successLatch = new CountDownLatch(1); - httpRequest.listener(new Request.Listener.Adapter() + request.listener(new Request.Listener.Adapter() { @Override public void onHeaders(Request request) @@ -52,7 +53,7 @@ public class HttpSenderTest successLatch.countDown(); } }); - connection.send(httpRequest, null); + connection.send(request, null); String requestString = endPoint.takeOutputString(); Assert.assertTrue(requestString.startsWith("GET ")); @@ -61,31 +62,29 @@ public class HttpSenderTest Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS)); } + @Slow @Test public void test_Send_NoRequestContent_IncompleteFlush() throws Exception { ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16); - HttpConnection connection = new HttpConnection(client, endPoint); - Request httpRequest = new HttpRequest(client, URI.create("http://localhost/")); - final CountDownLatch headersLatch = new CountDownLatch(1); - httpRequest.listener(new Request.Listener.Adapter() - { - @Override - public void onHeaders(Request request) - { - headersLatch.countDown(); - } - }); - connection.send(httpRequest, null); + HttpConnection connection = new HttpConnection(client, endPoint, null); + Request request = client.newRequest(URI.create("http://localhost/")); + connection.send(request, null); // This take will free space in the buffer and allow for the write to complete - StringBuilder request = new StringBuilder(endPoint.takeOutputString()); + StringBuilder builder = new StringBuilder(endPoint.takeOutputString()); // Wait for the write to complete - Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS)); - request.append(endPoint.takeOutputString()); + TimeUnit.SECONDS.sleep(1); - String requestString = request.toString(); + String chunk = endPoint.takeOutputString(); + while (chunk.length() > 0) + { + builder.append(chunk); + chunk = endPoint.takeOutputString(); + } + + String requestString = builder.toString(); Assert.assertTrue(requestString.startsWith("GET ")); Assert.assertTrue(requestString.endsWith("\r\n\r\n")); } @@ -96,10 +95,10 @@ public class HttpSenderTest ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); // Shutdown output to trigger the exception on write endPoint.shutdownOutput(); - HttpConnection connection = new HttpConnection(client, endPoint); - Request httpRequest = new HttpRequest(client, URI.create("http://localhost/")); + HttpConnection connection = new HttpConnection(client, endPoint, null); + Request request = client.newRequest(URI.create("http://localhost/")); final CountDownLatch failureLatch = new CountDownLatch(2); - httpRequest.listener(new Request.Listener.Adapter() + request.listener(new Request.Listener.Adapter() { @Override public void onFailure(Request request, Throwable x) @@ -107,7 +106,7 @@ public class HttpSenderTest failureLatch.countDown(); } }); - connection.send(httpRequest, new Response.Listener.Adapter() + connection.send(request, new Response.Listener.Adapter() { @Override public void onFailure(Response response, Throwable failure) @@ -123,10 +122,10 @@ public class HttpSenderTest public void test_Send_NoRequestContent_IncompleteFlush_Exception() throws Exception { ByteArrayEndPoint endPoint = new ByteArrayEndPoint("", 16); - HttpConnection connection = new HttpConnection(client, endPoint); - Request httpRequest = new HttpRequest(client, URI.create("http://localhost/")); + HttpConnection connection = new HttpConnection(client, endPoint, null); + Request request = client.newRequest(URI.create("http://localhost/")); final CountDownLatch failureLatch = new CountDownLatch(2); - httpRequest.listener(new Request.Listener.Adapter() + request.listener(new Request.Listener.Adapter() { @Override public void onFailure(Request request, Throwable x) @@ -134,7 +133,7 @@ public class HttpSenderTest failureLatch.countDown(); } }); - connection.send(httpRequest, new Response.Listener.Adapter() + connection.send(request, new Response.Listener.Adapter() { @Override public void onFailure(Response response, Throwable failure) @@ -156,13 +155,13 @@ public class HttpSenderTest public void test_Send_SmallRequestContent_InOneBuffer() throws Exception { ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); - HttpConnection connection = new HttpConnection(client, endPoint); - Request httpRequest = new HttpRequest(client, URI.create("http://localhost/")); + HttpConnection connection = new HttpConnection(client, endPoint, null); + Request request = client.newRequest(URI.create("http://localhost/")); String content = "abcdef"; - httpRequest.content(new ByteBufferContentProvider(ByteBuffer.wrap(content.getBytes("UTF-8")))); + request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content.getBytes("UTF-8")))); final CountDownLatch headersLatch = new CountDownLatch(1); final CountDownLatch successLatch = new CountDownLatch(1); - httpRequest.listener(new Request.Listener.Adapter() + request.listener(new Request.Listener.Adapter() { @Override public void onHeaders(Request request) @@ -176,7 +175,7 @@ public class HttpSenderTest successLatch.countDown(); } }); - connection.send(httpRequest, null); + connection.send(request, null); String requestString = endPoint.takeOutputString(); Assert.assertTrue(requestString.startsWith("GET ")); @@ -189,14 +188,14 @@ public class HttpSenderTest public void test_Send_SmallRequestContent_InTwoBuffers() throws Exception { ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); - HttpConnection connection = new HttpConnection(client, endPoint); - Request httpRequest = new HttpRequest(client, URI.create("http://localhost/")); + HttpConnection connection = new HttpConnection(client, endPoint, null); + Request request = client.newRequest(URI.create("http://localhost/")); String content1 = "0123456789"; String content2 = "abcdef"; - httpRequest.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes("UTF-8")), ByteBuffer.wrap(content2.getBytes("UTF-8")))); + request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes("UTF-8")), ByteBuffer.wrap(content2.getBytes("UTF-8")))); final CountDownLatch headersLatch = new CountDownLatch(1); final CountDownLatch successLatch = new CountDownLatch(1); - httpRequest.listener(new Request.Listener.Adapter() + request.listener(new Request.Listener.Adapter() { @Override public void onHeaders(Request request) @@ -210,7 +209,7 @@ public class HttpSenderTest successLatch.countDown(); } }); - connection.send(httpRequest, null); + connection.send(request, null); String requestString = endPoint.takeOutputString(); Assert.assertTrue(requestString.startsWith("GET ")); @@ -223,11 +222,11 @@ public class HttpSenderTest public void test_Send_SmallRequestContent_Chunked_InTwoChunks() throws Exception { ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); - HttpConnection connection = new HttpConnection(client, endPoint); - Request httpRequest = new HttpRequest(client, URI.create("http://localhost/")); + HttpConnection connection = new HttpConnection(client, endPoint, null); + Request request = client.newRequest(URI.create("http://localhost/")); String content1 = "0123456789"; String content2 = "ABCDEF"; - httpRequest.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes("UTF-8")), ByteBuffer.wrap(content2.getBytes("UTF-8"))) + request.content(new ByteBufferContentProvider(ByteBuffer.wrap(content1.getBytes("UTF-8")), ByteBuffer.wrap(content2.getBytes("UTF-8"))) { @Override public long length() @@ -237,7 +236,7 @@ public class HttpSenderTest }); final CountDownLatch headersLatch = new CountDownLatch(1); final CountDownLatch successLatch = new CountDownLatch(1); - httpRequest.listener(new Request.Listener.Adapter() + request.listener(new Request.Listener.Adapter() { @Override public void onHeaders(Request request) @@ -251,7 +250,7 @@ public class HttpSenderTest successLatch.countDown(); } }); - connection.send(httpRequest, null); + connection.send(request, null); String requestString = endPoint.takeOutputString(); Assert.assertTrue(requestString.startsWith("GET ")); @@ -262,5 +261,4 @@ public class HttpSenderTest Assert.assertTrue(headersLatch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(successLatch.await(5, TimeUnit.SECONDS)); } - } diff --git a/jetty-client-new/src/test/java/org/eclipse/jetty/client/RedirectionTest.java b/jetty-client-new/src/test/java/org/eclipse/jetty/client/RedirectionTest.java index bb096f5a0cc..06329974ad5 100644 --- a/jetty-client-new/src/test/java/org/eclipse/jetty/client/RedirectionTest.java +++ b/jetty-client-new/src/test/java/org/eclipse/jetty/client/RedirectionTest.java @@ -14,7 +14,7 @@ import org.junit.Assert; import org.junit.Before; import org.junit.Test; -public class RedirectionTest extends AbstractHttpClientTest +public class RedirectionTest extends AbstractHttpClientServerTest { @Before public void init() throws Exception diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java index 6c3f6f3a602..ccbf6ad344c 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/SelectorManager.java @@ -286,9 +286,11 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa @Override protected void doStop() throws Exception { + LOG.debug("Stopping {}", this); Stop stop = new Stop(); submit(stop); stop.await(getStopTimeout()); + LOG.debug("Stopped {}", this); } /**