From 2b5ec130031093e918f02cf5756404bd1c73d042 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 4 Sep 2012 23:44:35 +0200 Subject: [PATCH] Jetty9 - Third take at HTTP client implementation. --- .../org/eclipse/jetty/client/HttpClient.java | 22 +++++- .../eclipse/jetty/client/HttpConnection.java | 39 ++++++--- .../jetty/client/HttpContentResponse.java | 67 ++++++++++++++++ .../jetty/client/HttpConversation.java | 51 ++++-------- .../eclipse/jetty/client/HttpDestination.java | 79 +++++++++++++------ .../eclipse/jetty/client/HttpExchange.java | 69 ++++++++++++++++ .../eclipse/jetty/client/HttpReceiver.java | 66 ++++++++-------- .../org/eclipse/jetty/client/HttpRequest.java | 16 ++-- .../eclipse/jetty/client/HttpResponse.java | 9 ++- .../org/eclipse/jetty/client/HttpSender.java | 53 +++++++------ .../jetty/client/api/ContentResponse.java | 6 ++ .../org/eclipse/jetty/client/api/Request.java | 2 +- .../eclipse/jetty/client/HttpClientTest.java | 38 ++++++++- .../eclipse/jetty/client/RedirectionTest.java | 15 +++- .../org/eclipse/jetty/client/api/Usage.java | 4 +- .../test/resources/jetty-logging.properties | 2 +- 16 files changed, 396 insertions(+), 142 deletions(-) create mode 100644 jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpContentResponse.java create mode 100644 jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpExchange.java create mode 100644 jetty-client-new/src/main/java/org/eclipse/jetty/client/api/ContentResponse.java diff --git a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpClient.java b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpClient.java index df7c2f58255..199b1f81a1a 100644 --- a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpClient.java +++ b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpClient.java @@ -20,7 +20,9 @@ import java.net.SocketAddress; import java.net.URI; import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; +import java.util.ArrayList; import java.util.Arrays; +import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executor; @@ -30,6 +32,7 @@ import java.util.concurrent.ScheduledExecutorService; import javax.net.ssl.SSLEngine; import org.eclipse.jetty.client.api.Connection; +import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; @@ -85,6 +88,7 @@ import org.eclipse.jetty.util.thread.QueuedThreadPool; public class HttpClient extends AggregateLifeCycle { private static final Logger LOG = Log.getLogger(HttpClient.class); + private final ConcurrentMap destinations = new ConcurrentHashMap<>(); private final ConcurrentMap conversations = new ConcurrentHashMap<>(); private volatile Executor executor; @@ -131,6 +135,8 @@ public class HttpClient extends AggregateLifeCycle addBean(selectorManager); super.doStart(); + + LOG.info("Started {}", this); } protected SelectorManager newSelectorManager() @@ -172,12 +178,12 @@ public class HttpClient extends AggregateLifeCycle this.bindAddress = bindAddress; } - public Future GET(String uri) + public Future GET(String uri) { return GET(URI.create(uri)); } - public Future GET(URI uri) + public Future GET(URI uri) { return newRequest(uri).send(); } @@ -226,10 +232,17 @@ public class HttpClient extends AggregateLifeCycle Destination existing = destinations.putIfAbsent(address, destination); if (existing != null) destination = existing; + else + LOG.debug("Created {}", destination); } return destination; } + public List getDestinations() + { + return new ArrayList<>(destinations.values()); + } + public String getUserAgent() { return agent; @@ -345,13 +358,13 @@ public class HttpClient extends AggregateLifeCycle } } - public HttpConversation conversationFor(Request request, Response.Listener listener) + public HttpConversation conversationFor(Request request) { long id = request.id(); HttpConversation conversation = conversations.get(id); if (conversation == null) { - conversation = new HttpConversation(this, listener); + conversation = new HttpConversation(this, id); HttpConversation existing = conversations.putIfAbsent(id, conversation); if (existing != null) conversation = existing; @@ -364,6 +377,7 @@ public class HttpClient extends AggregateLifeCycle // TODO switch (status) { + case 302: case 303: return new RedirectionListener(this); } diff --git a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpConnection.java index af323eb1302..d29e30c01bd 100644 --- a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpConnection.java +++ b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpConnection.java @@ -1,5 +1,7 @@ package org.eclipse.jetty.client; +import java.util.concurrent.atomic.AtomicReference; + import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.Request; @@ -16,13 +18,17 @@ public class HttpConnection extends AbstractConnection implements Connection { private static final Logger LOG = Log.getLogger(HttpConnection.class); + private final AtomicReference exchange = new AtomicReference<>(); private final HttpClient client; - private volatile HttpConversation conversation; + private final HttpSender sender; + private final HttpReceiver receiver; public HttpConnection(HttpClient client, EndPoint endPoint) { super(endPoint, client.getExecutor()); this.client = client; + this.sender = new HttpSender(this); + this.receiver = new HttpReceiver(this); } public HttpClient getHttpClient() @@ -40,9 +46,9 @@ public class HttpConnection extends AbstractConnection implements Connection @Override protected boolean onReadTimeout() { - HttpConversation conversation = this.conversation; - if (conversation != null) - conversation.idleTimeout(); + HttpExchange exchange = this.exchange.get(); + if (exchange != null) + exchange.idleTimeout(); return true; } @@ -50,10 +56,18 @@ public class HttpConnection extends AbstractConnection implements Connection public void send(Request request, Response.Listener listener) { normalizeRequest(request); - HttpConversation conversation = client.conversationFor(request, listener); - this.conversation = conversation; - conversation.prepare(this, request, listener); - conversation.send(); + HttpConversation conversation = client.conversationFor(request); + HttpExchange exchange = new HttpExchange(conversation, sender, receiver, request, listener); + if (this.exchange.compareAndSet(null, exchange)) + { + conversation.add(exchange); + LOG.debug("{}({})", request, conversation); + exchange.send(); + } + else + { + throw new UnsupportedOperationException("Pipelined requests not supported"); + } } private void normalizeRequest(Request request) @@ -96,11 +110,10 @@ public class HttpConnection extends AbstractConnection implements Connection @Override public void onFillable() { - HttpConversation conversation = this.conversation; - if (conversation != null) - conversation.receive(); + HttpExchange exchange = this.exchange.get(); + if (exchange != null) + exchange.receive(); else - // TODO test sending white space... we want to consume it but throw if it's not whitespace - LOG.warn("Ready to read response, but no receiver"); + throw new IllegalStateException(); } } diff --git a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpContentResponse.java b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpContentResponse.java new file mode 100644 index 00000000000..627c1f28b6f --- /dev/null +++ b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpContentResponse.java @@ -0,0 +1,67 @@ +package org.eclipse.jetty.client; + +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpVersion; + +public class HttpContentResponse implements ContentResponse +{ + private final Response response; + private final BufferingResponseListener listener; + + public HttpContentResponse(Response response, BufferingResponseListener listener) + { + this.response = response; + this.listener = listener; + } + + @Override + public Request request() + { + return response.request(); + } + + @Override + public Listener listener() + { + return response.listener(); + } + + @Override + public HttpVersion version() + { + return response.version(); + } + + @Override + public int status() + { + return response.status(); + } + + @Override + public String reason() + { + return response.reason(); + } + + @Override + public HttpFields headers() + { + return response.headers(); + } + + @Override + public void abort() + { + response.abort(); + } + + @Override + public byte[] content() + { + return listener.content(); + } +} diff --git a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpConversation.java b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpConversation.java index 78ef60e3d30..0a89e431120 100644 --- a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpConversation.java +++ b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpConversation.java @@ -1,38 +1,25 @@ package org.eclipse.jetty.client; +import java.util.ArrayList; +import java.util.List; + import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; public class HttpConversation { - private final Response.Listener applicationListener; - private final HttpSender sender; - private final HttpReceiver receiver; + private final List exchanges = new ArrayList<>(); + private final HttpClient client; + private final long id; + private HttpConnection connection; private Request request; private Response.Listener listener; - private HttpResponse response; - public HttpConversation(HttpClient client, Response.Listener listener) + public HttpConversation(HttpClient client, long id) { - applicationListener = listener; - sender = new HttpSender(client); - receiver = new HttpReceiver(); - } - - public Response.Listener applicationListener() - { - return applicationListener; - } - - public void prepare(HttpConnection connection, Request request, Response.Listener listener) - { - if (this.connection != null) - throw new IllegalStateException(); - this.connection = connection; - this.request = request; - this.listener = listener; - this.response = new HttpResponse(request, listener); + this.client = client; + this.id = id; } public void done() @@ -67,23 +54,19 @@ public class HttpConversation this.listener = listener; } - public HttpResponse response() + public void add(HttpExchange exchange) { - return response; + exchanges.add(exchange); } - public void send() + public HttpExchange first() { - sender.send(this); + return exchanges.get(0); } - public void idleTimeout() + @Override + public String toString() { - receiver.idleTimeout(); - } - - public void receive() - { - receiver.receive(this); + return String.format("%s[%d]", HttpConversation.class.getSimpleName(), id); } } diff --git a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpDestination.java index 63111dfeefb..43491b00988 100644 --- a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -25,15 +25,19 @@ import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FutureCallback; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; public class HttpDestination implements Destination { + private static final Logger LOG = Log.getLogger(HttpDestination.class); + private final AtomicInteger connectionCount = new AtomicInteger(); private final HttpClient client; private final String scheme; private final String host; private final int port; - private final Queue requests; + private final Queue requests; private final Queue idleConnections; private final Queue activeConnections; @@ -76,21 +80,20 @@ public class HttpDestination implements Destination if (port != request.port()) throw new IllegalArgumentException("Invalid request port " + request.port() + " for destination " + this); - HttpResponse response = new HttpResponse(request, listener); - + RequestPair requestPair = new RequestPair(request, listener); if (client.isRunning()) { - if (requests.offer(response)) + if (requests.offer(requestPair)) { - if (!client.isRunning() && requests.remove(response)) + if (!client.isRunning() && requests.remove(requestPair)) { - throw new RejectedExecutionException(HttpClient.class.getSimpleName() + " is shutting down"); + throw new RejectedExecutionException(HttpClient.class.getSimpleName() + " is stopping"); } else { - Request.Listener requestListener = request.listener(); - notifyRequestQueued(requestListener, request); - ensureConnection(); + LOG.debug("Queued {}", request); + notifyRequestQueued(request.listener(), request); + ensureConnection(); // TODO: improve and test this } } else @@ -98,6 +101,10 @@ public class HttpDestination implements Destination throw new RejectedExecutionException("Max requests per address " + client.getMaxQueueSizePerAddress() + " exceeded"); } } + else + { + throw new RejectedExecutionException(HttpClient.class.getSimpleName() + " is stopped"); + } } private void notifyRequestQueued(Request.Listener listener, Request request) @@ -109,28 +116,33 @@ public class HttpDestination implements Destination } catch (Exception x) { - // TODO: log or abort request send ? + LOG.info("Exception while notifying listener " + listener, x); } } private void ensureConnection() { - int maxConnections = client.getMaxConnectionsPerAddress(); + final int maxConnections = client.getMaxConnectionsPerAddress(); while (true) { - int count = connectionCount.get(); + int current = connectionCount.get(); + final int next = current + 1; - if (count >= maxConnections) + if (next > maxConnections) + { + LOG.debug("Max connections reached {}: {}", this, current); break; + } - if (connectionCount.compareAndSet(count, count + 1)) + if (connectionCount.compareAndSet(current, next)) { newConnection(new Callback() { @Override public void completed(Connection connection) { - dispatch(connection); + LOG.debug("Created connection {}/{} for {}", next, maxConnections, this); + process(connection); } @Override @@ -157,16 +169,18 @@ public class HttpDestination implements Destination } /** - * Responsibility of this method is to dequeue a request, associate it to the given {@code connection} - * and dispatch a thread to execute the request. + *

Processes a new connection making it idle or active depending on whether requests are waiting to be sent.

+ *

A new connection is created when a request needs to be executed; it is possible that the request that + * triggered the request creation is executed by another connection that was just released, so the new connection + * may become idle.

+ *

If a request is waiting to be executed, it will be dequeued and executed by the new connection.

* - * This can be done in several ways: one could be to - * @param connection + * @param connection the new connection */ - protected void dispatch(final Connection connection) + protected void process(final Connection connection) { - final Response response = requests.poll(); - if (response == null) + final RequestPair requestPair = requests.poll(); + if (requestPair == null) { idleConnections.offer(connection); } @@ -178,7 +192,7 @@ public class HttpDestination implements Destination @Override public void run() { - connection.send(response.request(), response.listener()); + connection.send(requestPair.request, requestPair.listener); } }); } @@ -197,4 +211,23 @@ public class HttpDestination implements Destination // if I create manually the connection, then I call send(request, listener) // Other ways ? + + + @Override + public String toString() + { + return String.format("%s(%s://%s:%d)", HttpDestination.class.getSimpleName(), scheme(), host(), port()); + } + + private static class RequestPair + { + private final Request request; + private final Response.Listener listener; + + public RequestPair(Request request, Response.Listener listener) + { + this.request = request; + this.listener = listener; + } + } } diff --git a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpExchange.java b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpExchange.java new file mode 100644 index 00000000000..a7a64cf77b6 --- /dev/null +++ b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpExchange.java @@ -0,0 +1,69 @@ +package org.eclipse.jetty.client; + +import org.eclipse.jetty.client.api.Request; +import org.eclipse.jetty.client.api.Response; + +public class HttpExchange +{ + private final HttpConversation conversation; + private final HttpSender sender; + private final HttpReceiver receiver; + private final Request request; + private final Response.Listener listener; + private final HttpResponse response; + + public HttpExchange(HttpConversation conversation, HttpSender sender, HttpReceiver receiver, Request request, Response.Listener listener) + { + this.conversation = conversation; + this.sender = sender; + this.receiver = receiver; + this.request = request; + this.listener = listener; + this.response = new HttpResponse(request, listener); + } + + public HttpConversation conversation() + { + return conversation; + } + + public Request request() + { + return request; + } + + public Response.Listener listener() + { + return listener; + } + + public HttpResponse response() + { + return response; + } + + public void send() + { + sender.send(this); + } + + public void idleTimeout() + { + receiver.idleTimeout(); + } + + public void receive() + { + receiver.receive(this); + } + + public void requestDone() + { + // TODO + } + + public void responseDone() + { + // TODO + } +} diff --git a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index a58ca64b2a8..69e7aac8ff0 100644 --- a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -4,6 +4,7 @@ import java.io.EOFException; import java.io.IOException; import java.nio.ByteBuffer; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.http.HttpHeader; @@ -14,20 +15,24 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -class HttpReceiver implements HttpParser.ResponseHandler +public class HttpReceiver implements HttpParser.ResponseHandler { private static final Logger LOG = Log.getLogger(HttpReceiver.class); private final HttpParser parser = new HttpParser(this); - private HttpConversation conversation; + private final AtomicReference exchange = new AtomicReference<>(); + private final AtomicReference listener = new AtomicReference<>(); + private final HttpConnection connection; - public void receive(HttpConversation conversation) + public HttpReceiver(HttpConnection connection) { - if (this.conversation != null) - throw new IllegalStateException(); - this.conversation = conversation; + this.connection = connection; + } + + public void receive(HttpExchange exchange) + { + this.exchange.set(exchange); - HttpConnection connection = conversation.connection(); HttpClient client = connection.getHttpClient(); ByteBufferPool bufferPool = client.getByteBufferPool(); ByteBuffer buffer = bufferPool.acquire(client.getResponseBufferSize(), true); @@ -40,7 +45,6 @@ class HttpReceiver implements HttpParser.ResponseHandler if (read > 0) { parser.parseNext(buffer); - // TODO: response done, reset ? } else if (read == 0) { @@ -65,39 +69,44 @@ class HttpReceiver implements HttpParser.ResponseHandler @Override public boolean startResponse(HttpVersion version, int status, String reason) { - HttpResponse response = conversation.response(); - response.version(version).status(status).reason(reason); + HttpExchange exchange = this.exchange.get(); // Probe the protocol listeners - HttpClient client = conversation.connection().getHttpClient(); + HttpClient client = connection.getHttpClient(); + HttpResponse response = exchange.response(); Response.Listener listener = client.lookup(status); - if (listener != null) - conversation.listener(listener); - else - conversation.listener(conversation.applicationListener()); + if (listener == null) + listener = exchange.conversation().first().listener(); + this.listener.set(listener); - notifyBegin(conversation.listener(), response); + response.version(version).status(status).reason(reason); + LOG.debug("{}", response); + + notifyBegin(listener, response); return false; } @Override public boolean parsedHeader(HttpHeader header, String name, String value) { - conversation.response().headers().put(name, value); + HttpExchange exchange = this.exchange.get(); + exchange.response().headers().put(name, value); return false; } @Override public boolean headerComplete() { - notifyHeaders(conversation.listener(), conversation.response()); + HttpExchange exchange = this.exchange.get(); + notifyHeaders(listener.get(), exchange.response()); return false; } @Override public boolean content(ByteBuffer buffer) { - notifyContent(conversation.listener(), conversation.response(), buffer); + HttpExchange exchange = this.exchange.get(); + notifyContent(listener.get(), exchange.response(), buffer); return false; } @@ -110,21 +119,16 @@ class HttpReceiver implements HttpParser.ResponseHandler protected void success() { - HttpConversation conversation = this.conversation; - this.conversation = null; - Response.Listener listener = conversation.listener(); - Response response = conversation.response(); - conversation.done(); - notifySuccess(listener, response); + HttpExchange exchange = this.exchange.getAndSet(null); + exchange.responseDone(); + notifySuccess(listener.get(), exchange.response()); } protected void fail(Throwable failure) { - Response.Listener listener = conversation.listener(); - Response response = conversation.response(); - conversation.done(); - conversation = null; - notifyFailure(listener, response, failure); + HttpExchange exchange = this.exchange.getAndSet(null); + exchange.responseDone(); + notifyFailure(listener.get(), exchange.response(), failure); } @Override @@ -137,7 +141,7 @@ class HttpReceiver implements HttpParser.ResponseHandler @Override public void badMessage(int status, String reason) { - conversation.response().status(status).reason(reason); + exchange.get().response().status(status).reason(reason); fail(new HttpResponseException()); } diff --git a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpRequest.java b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpRequest.java index b5a2246e607..23dd6cc33db 100644 --- a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpRequest.java +++ b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpRequest.java @@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.eclipse.jetty.client.api.ContentDecoder; import org.eclipse.jetty.client.api.ContentProvider; +import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.http.HttpFields; @@ -41,7 +42,6 @@ public class HttpRequest implements Request private HttpVersion version; private String agent; private long idleTimeout; - private Response response; private Listener listener; private ContentProvider content; private final HttpFields headers = new HttpFields(); @@ -228,23 +228,23 @@ public class HttpRequest implements Request } @Override - public Future send() + public Future send() { - final FutureCallback result = new FutureCallback<>(); + final FutureCallback result = new FutureCallback<>(); BufferingResponseListener listener = new BufferingResponseListener() { @Override public void onSuccess(Response response) { super.onSuccess(response); - result.completed(response); + result.completed(new HttpContentResponse(response, this)); } @Override public void onFailure(Response response, Throwable failure) { super.onFailure(response, failure); - result.failed(response, failure); + result.failed(new HttpContentResponse(response, this), failure); } }; send(listener); @@ -256,4 +256,10 @@ public class HttpRequest implements Request { client.send(this, listener); } + + @Override + public String toString() + { + return String.format("%s[%s %s %s]@%x", HttpRequest.class.getSimpleName(), method(), path(), version(), hashCode()); + } } diff --git a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpResponse.java b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpResponse.java index d20ebf0eb89..4d724686ebc 100644 --- a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpResponse.java +++ b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpResponse.java @@ -17,9 +17,8 @@ import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpVersion; -import org.eclipse.jetty.util.FutureCallback; -public class HttpResponse extends FutureCallback implements Response +public class HttpResponse implements Response { private final HttpFields headers = new HttpFields(); private final Request request; @@ -91,4 +90,10 @@ public class HttpResponse extends FutureCallback implements Response { // request.abort(); } + + @Override + public String toString() + { + return String.format("%s[%s %d %s]", HttpResponse.class.getSimpleName(), version(), status(), reason()); + } } diff --git a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpSender.java index d75491a4255..7edd1cdfb07 100644 --- a/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client-new/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -18,40 +18,48 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -class HttpSender +public class HttpSender { private static final Logger LOG = Log.getLogger(HttpSender.class); private final HttpGenerator generator = new HttpGenerator(); - private final HttpClient client; - private HttpConversation conversation; + private final AtomicReference exchange = new AtomicReference<>(); + private final HttpConnection connection; + private long contentLength; private Iterator contentChunks; private ByteBuffer header; private ByteBuffer chunk; private boolean requestHeadersComplete; - HttpSender(HttpClient client) + public HttpSender(HttpConnection connection) { - this.client = client; + this.connection = connection; } - public void send(HttpConversation conversation) + public void send(HttpExchange exchange) { - this.conversation = conversation; - ContentProvider content = conversation.request().content(); - this.contentLength = content == null ? -1 : content.length(); - this.contentChunks = content == null ? Collections.emptyIterator() : content.iterator(); - send(); + if (this.exchange.compareAndSet(null, exchange)) + { + ContentProvider content = exchange.request().content(); + this.contentLength = content == null ? -1 : content.length(); + this.contentChunks = content == null ? Collections.emptyIterator() : content.iterator(); + send(); + } + else + { + throw new IllegalStateException(); + } } private void send() { try { - HttpConnection connection = conversation.connection(); + HttpClient client = connection.getHttpClient(); EndPoint endPoint = connection.getEndPoint(); ByteBufferPool byteBufferPool = client.getByteBufferPool(); + final Request request = exchange.get().request(); HttpGenerator.RequestInfo info = null; ByteBuffer content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER; boolean lastContent = !contentChunks.hasNext(); @@ -62,7 +70,6 @@ class HttpSender { case NEED_INFO: { - Request request = conversation.request(); info = new HttpGenerator.RequestInfo(request.version(), request.headers(), contentLength, request.method().asString(), request.path()); break; } @@ -83,7 +90,7 @@ class HttpSender @Override protected void pendingCompleted() { - notifyRequestHeadersComplete(conversation.request()); + notifyRequestHeadersComplete(request); send(); } @@ -108,7 +115,7 @@ class HttpSender if (!requestHeadersComplete) { requestHeadersComplete = true; - notifyRequestHeadersComplete(conversation.request()); + notifyRequestHeadersComplete(request); } releaseBuffers(); content = contentChunks.hasNext() ? contentChunks.next() : BufferUtil.EMPTY_BUFFER; @@ -151,13 +158,13 @@ class HttpSender protected void success() { - Request request = conversation.request(); - conversation = null; + HttpExchange exchange = this.exchange.getAndSet(null); + exchange.requestDone(); generator.reset(); requestHeadersComplete = false; // It is important to notify *after* we reset because // the notification may trigger another request/response - notifyRequestSuccess(request); + notifyRequestSuccess(exchange.request()); } protected void fail(Throwable failure) @@ -165,15 +172,17 @@ class HttpSender BufferUtil.clear(header); BufferUtil.clear(chunk); releaseBuffers(); - conversation.connection().getEndPoint().shutdownOutput(); + connection.getEndPoint().shutdownOutput(); generator.abort(); - notifyRequestFailure(conversation.request(), failure); - notifyResponseFailure(conversation.listener(), failure); + HttpExchange exchange = this.exchange.getAndSet(null); + exchange.requestDone(); + notifyRequestFailure(exchange.request(), failure); + notifyResponseFailure(exchange.listener(), failure); } private void releaseBuffers() { - ByteBufferPool bufferPool = client.getByteBufferPool(); + ByteBufferPool bufferPool = connection.getHttpClient().getByteBufferPool(); if (!BufferUtil.hasContent(header)) { bufferPool.release(header); diff --git a/jetty-client-new/src/main/java/org/eclipse/jetty/client/api/ContentResponse.java b/jetty-client-new/src/main/java/org/eclipse/jetty/client/api/ContentResponse.java new file mode 100644 index 00000000000..6931c557018 --- /dev/null +++ b/jetty-client-new/src/main/java/org/eclipse/jetty/client/api/ContentResponse.java @@ -0,0 +1,6 @@ +package org.eclipse.jetty.client.api; + +public interface ContentResponse extends Response +{ + byte[] content(); +} diff --git a/jetty-client-new/src/main/java/org/eclipse/jetty/client/api/Request.java b/jetty-client-new/src/main/java/org/eclipse/jetty/client/api/Request.java index 84520587b64..cf645840f4b 100644 --- a/jetty-client-new/src/main/java/org/eclipse/jetty/client/api/Request.java +++ b/jetty-client-new/src/main/java/org/eclipse/jetty/client/api/Request.java @@ -74,7 +74,7 @@ public interface Request Request listener(Listener listener); - Future send(); + Future send(); void send(Response.Listener listener); diff --git a/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpClientTest.java b/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpClientTest.java index 679c1bb34e0..5af56922d51 100644 --- a/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpClientTest.java +++ b/jetty-client-new/src/test/java/org/eclipse/jetty/client/HttpClientTest.java @@ -14,21 +14,24 @@ package org.eclipse.jetty.client; import java.io.IOException; +import java.util.List; import java.util.concurrent.TimeUnit; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import junit.framework.Assert; +import org.eclipse.jetty.client.api.ContentResponse; +import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.handler.AbstractHandler; +import org.junit.Assert; import org.junit.Test; public class HttpClientTest extends AbstractHttpClientTest { @Test - public void testGETNoResponseContent() throws Exception + public void test_GET_NoResponseContent() throws Exception { start(new AbstractHandler() { @@ -44,4 +47,35 @@ public class HttpClientTest extends AbstractHttpClientTest Assert.assertNotNull(response); Assert.assertEquals(200, response.status()); } + + @Test + public void test_GET_ResponseContent() throws Exception + { + final byte[] data = new byte[]{0, 1, 2, 3, 4, 5, 6, 7}; + start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + response.getOutputStream().write(data); + baseRequest.setHandled(true); + } + }); + + ContentResponse response = client.GET("http://localhost:" + connector.getLocalPort()).get(5, TimeUnit.SECONDS); + + Assert.assertNotNull(response); + Assert.assertEquals(200, response.status()); + byte[] content = response.content(); + Assert.assertArrayEquals(data, content); + } + + { + + List destinations = client.getDestinations(); + Assert.assertNotNull(destinations); + Assert.assertEquals(1, destinations.size()); + Destination destination = destinations.get(0); + Assert.assertNotNull(destination); + } } diff --git a/jetty-client-new/src/test/java/org/eclipse/jetty/client/RedirectionTest.java b/jetty-client-new/src/test/java/org/eclipse/jetty/client/RedirectionTest.java index f41f5c8afd2..bb096f5a0cc 100644 --- a/jetty-client-new/src/test/java/org/eclipse/jetty/client/RedirectionTest.java +++ b/jetty-client-new/src/test/java/org/eclipse/jetty/client/RedirectionTest.java @@ -23,11 +23,22 @@ public class RedirectionTest extends AbstractHttpClientTest } @Test - public void test303() throws Exception + public void test_303() throws Exception { Response response = client.newRequest("localhost", connector.getLocalPort()) .path("/303/done") - .send().get(500, TimeUnit.SECONDS); + .send().get(5, TimeUnit.SECONDS); + Assert.assertNotNull(response); + Assert.assertEquals(200, response.status()); + Assert.assertFalse(response.headers().containsKey(HttpHeader.LOCATION.asString())); + } + + @Test + public void test_303_302() throws Exception + { + Response response = client.newRequest("localhost", connector.getLocalPort()) + .path("/303/302/done") + .send().get(5, TimeUnit.SECONDS); Assert.assertNotNull(response); Assert.assertEquals(200, response.status()); Assert.assertFalse(response.headers().containsKey(HttpHeader.LOCATION.asString())); diff --git a/jetty-client-new/src/test/java/org/eclipse/jetty/client/api/Usage.java b/jetty-client-new/src/test/java/org/eclipse/jetty/client/api/Usage.java index 3452e12db1e..242cf36d403 100644 --- a/jetty-client-new/src/test/java/org/eclipse/jetty/client/api/Usage.java +++ b/jetty-client-new/src/test/java/org/eclipse/jetty/client/api/Usage.java @@ -38,7 +38,7 @@ public class Usage public void testGETBlocking_ShortAPI() throws Exception { HttpClient client = new HttpClient(); - Future responseFuture = client.GET("http://localhost:8080/foo"); + Future responseFuture = client.GET("http://localhost:8080/foo"); Response response = responseFuture.get(); Assert.assertEquals(200, response.status()); // Headers abstraction needed for: @@ -66,7 +66,7 @@ public class Usage .decoder(null) .content(null) .idleTimeout(5000L); - Future responseFuture = request.send(); + Future responseFuture = request.send(); Response response = responseFuture.get(); Assert.assertEquals(200, response.status()); } diff --git a/jetty-client-new/src/test/resources/jetty-logging.properties b/jetty-client-new/src/test/resources/jetty-logging.properties index 276e49dd14a..01824b988bf 100644 --- a/jetty-client-new/src/test/resources/jetty-logging.properties +++ b/jetty-client-new/src/test/resources/jetty-logging.properties @@ -1,2 +1,2 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog -org.eclipse.jetty.LEVEL=DEBUG +org.eclipse.jetty.client.LEVEL=DEBUG