From 0d762bcdbc11cd9e5c6d20cad2ec919ba6b060cc Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 30 Oct 2012 19:21:49 +0100 Subject: [PATCH] HTTP client: refactored response listeners to support lambdas. --- .../client/AuthenticationProtocolHandler.java | 17 +-- .../jetty/client/ContinueProtocolHandler.java | 20 +++- .../jetty/client/DoubleResponseListener.java | 80 ------------- .../org/eclipse/jetty/client/HttpClient.java | 20 +++- .../eclipse/jetty/client/HttpConnection.java | 15 ++- .../jetty/client/HttpContentResponse.java | 5 +- .../jetty/client/HttpConversation.java | 11 +- .../eclipse/jetty/client/HttpDestination.java | 81 ++++++------- .../eclipse/jetty/client/HttpExchange.java | 20 ++-- .../eclipse/jetty/client/HttpReceiver.java | 45 +++++--- .../org/eclipse/jetty/client/HttpRequest.java | 60 ++++++++-- .../eclipse/jetty/client/HttpResponse.java | 17 ++- .../org/eclipse/jetty/client/HttpSender.java | 21 ++-- .../jetty/client/RedirectProtocolHandler.java | 22 ++-- .../eclipse/jetty/client/RequestNotifier.java | 10 +- .../jetty/client/ResponseNotifier.java | 106 ++++++++++++------ .../eclipse/jetty/client/api/Connection.java | 2 +- .../org/eclipse/jetty/client/api/Request.java | 46 ++++++-- .../eclipse/jetty/client/api/Response.java | 41 +++++-- .../client/util/BlockingResponseListener.java | 1 - .../util/BufferingResponseListener.java | 5 +- .../client/HttpClientAuthenticationTest.java | 15 ++- .../jetty/client/HttpClientLoadTest.java | 6 +- .../eclipse/jetty/client/HttpClientTest.java | 10 +- .../client/HttpConnectionLifecycleTest.java | 6 +- .../jetty/client/HttpReceiverTest.java | 15 +-- .../jetty/client/HttpResponseAbortTest.java | 17 ++- .../eclipse/jetty/client/HttpSenderTest.java | 14 +-- 28 files changed, 424 insertions(+), 304 deletions(-) delete mode 100644 jetty-client/src/main/java/org/eclipse/jetty/client/DoubleResponseListener.java diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java index 97599f6f8c3..c1e2dd722fa 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AuthenticationProtocolHandler.java @@ -80,13 +80,13 @@ public class AuthenticationProtocolHandler implements ProtocolHandler { Request request = result.getRequest(); HttpConversation conversation = client.getConversation(request.getConversationID(), false); - Response.Listener listener = conversation.getExchanges().peekFirst().getResponseListener(); + List listeners = conversation.getExchanges().peekFirst().getResponseListeners(); ContentResponse response = new HttpContentResponse(result.getResponse(), getContent(), getEncoding()); if (result.isFailed()) { Throwable failure = result.getFailure(); LOG.debug("Authentication challenge failed {}", failure); - notifier.forwardFailureComplete(listener, request, result.getRequestFailure(), response, result.getResponseFailure()); + notifier.forwardFailureComplete(listeners, request, result.getRequestFailure(), response, result.getResponseFailure()); return; } @@ -94,7 +94,7 @@ public class AuthenticationProtocolHandler implements ProtocolHandler if (wwwAuthenticates.isEmpty()) { LOG.debug("Authentication challenge without WWW-Authenticate header"); - notifier.forwardFailureComplete(listener, request, null, response, new HttpResponseException("HTTP protocol violation: 401 without WWW-Authenticate header", response)); + notifier.forwardFailureComplete(listeners, request, null, response, new HttpResponseException("HTTP protocol violation: 401 without WWW-Authenticate header", response)); return; } @@ -113,7 +113,7 @@ public class AuthenticationProtocolHandler implements ProtocolHandler if (authentication == null) { LOG.debug("No authentication available for {}", request); - notifier.forwardSuccessComplete(listener, request, response); + notifier.forwardSuccessComplete(listeners, request, response); return; } @@ -121,19 +121,20 @@ public class AuthenticationProtocolHandler implements ProtocolHandler LOG.debug("Authentication result {}", authnResult); if (authnResult == null) { - notifier.forwardSuccessComplete(listener, request, response); + notifier.forwardSuccessComplete(listeners, request, response); return; } - authnResult.apply(request); - request.send(new Response.Listener.Empty() + Request newRequest = client.copyRequest(request, request.getURI()); + authnResult.apply(newRequest); + newRequest.onResponseSuccess(new Response.SuccessListener() { @Override public void onSuccess(Response response) { client.getAuthenticationStore().addAuthenticationResult(authnResult); } - }); + }).send(null); } private List parseWWWAuthenticate(Response response) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ContinueProtocolHandler.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ContinueProtocolHandler.java index a9dec3af342..8588914571a 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ContinueProtocolHandler.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ContinueProtocolHandler.java @@ -18,8 +18,11 @@ package org.eclipse.jetty.client; +import java.util.List; + import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeaderValue; @@ -67,14 +70,14 @@ public class ContinueProtocolHandler implements ProtocolHandler HttpExchange exchange = conversation.getExchanges().peekLast(); assert exchange.getResponse() == response; - Response.Listener listener = exchange.getResponseListener(); + List listeners = exchange.getResponseListeners(); switch (response.getStatus()) { case 100: { // All good, continue exchange.resetResponse(true); - conversation.setResponseListener(listener); + conversation.setResponseListeners(listeners); exchange.proceed(true); break; } @@ -82,8 +85,8 @@ public class ContinueProtocolHandler implements ProtocolHandler { // Server either does not support 100 Continue, or it does and wants to refuse the request content HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getEncoding()); - notifier.forwardSuccess(listener, contentResponse); - conversation.setResponseListener(listener); + notifier.forwardSuccess(listeners, contentResponse); + conversation.setResponseListeners(listeners); exchange.proceed(false); break; } @@ -99,9 +102,14 @@ public class ContinueProtocolHandler implements ProtocolHandler HttpExchange exchange = conversation.getExchanges().peekLast(); assert exchange.getResponse() == response; - Response.Listener listener = exchange.getResponseListener(); + List listeners = exchange.getResponseListeners(); HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getEncoding()); - notifier.forwardFailureComplete(listener, exchange.getRequest(), exchange.getRequestFailure(), contentResponse, failure); + notifier.forwardFailureComplete(listeners, exchange.getRequest(), exchange.getRequestFailure(), contentResponse, failure); + } + + @Override + public void onComplete(Result result) + { } } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/DoubleResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/DoubleResponseListener.java deleted file mode 100644 index ba2587bc203..00000000000 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/DoubleResponseListener.java +++ /dev/null @@ -1,80 +0,0 @@ -// -// ======================================================================== -// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== -// - -package org.eclipse.jetty.client; - -import java.nio.ByteBuffer; - -import org.eclipse.jetty.client.api.Response; -import org.eclipse.jetty.client.api.Result; - -public class DoubleResponseListener implements Response.Listener -{ - private final ResponseNotifier responseNotifier; - private final Response.Listener listener1; - private final Response.Listener listener2; - - public DoubleResponseListener(ResponseNotifier responseNotifier, Response.Listener listener1, Response.Listener listener2) - { - this.responseNotifier = responseNotifier; - this.listener1 = listener1; - this.listener2 = listener2; - } - - @Override - public void onBegin(Response response) - { - responseNotifier.notifyBegin(listener1, response); - responseNotifier.notifyBegin(listener2, response); - } - - @Override - public void onHeaders(Response response) - { - responseNotifier.notifyHeaders(listener1, response); - responseNotifier.notifyHeaders(listener2, response); - } - - @Override - public void onContent(Response response, ByteBuffer content) - { - responseNotifier.notifyContent(listener1, response, content); - responseNotifier.notifyContent(listener2, response, content); - } - - @Override - public void onSuccess(Response response) - { - responseNotifier.notifySuccess(listener1, response); - responseNotifier.notifySuccess(listener2, response); - } - - @Override - public void onFailure(Response response, Throwable failure) - { - responseNotifier.notifyFailure(listener1, response, failure); - responseNotifier.notifyFailure(listener2, response, failure); - } - - @Override - public void onComplete(Result result) - { - responseNotifier.notifyComplete(listener1, result); - responseNotifier.notifyComplete(listener2, result); - } -} diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java index a75d04bd4ca..224c16d464c 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpClient.java @@ -45,6 +45,7 @@ import org.eclipse.jetty.client.api.CookieStore; import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; +import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpMethod; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; @@ -265,9 +266,15 @@ public class HttpClient extends ContainerLifeCycle return new HttpRequest(this, uri); } - protected Request newRequest(long id, String uri) + protected Request copyRequest(Request oldRequest, String newURI) { - return new HttpRequest(this, id, URI.create(uri)); + Request newRequest = new HttpRequest(this, oldRequest.getConversationID(), URI.create(newURI)); + newRequest.method(oldRequest.getMethod()) + .version(oldRequest.getVersion()) + .content(oldRequest.getContent()); + for (HttpFields.Field header : oldRequest.getHeaders()) + newRequest.header(header.getName(), header.getValue()); + return newRequest; } private String address(String scheme, String host, int port) @@ -307,7 +314,7 @@ public class HttpClient extends ContainerLifeCycle return new ArrayList(destinations.values()); } - protected void send(final Request request, Response.Listener listener) + protected void send(final Request request, List listeners) { String scheme = request.getScheme().toLowerCase(); if (!Arrays.asList("http", "https").contains(scheme)) @@ -317,11 +324,12 @@ public class HttpClient extends ContainerLifeCycle if (port < 0) port = "https".equals(scheme) ? 443 : 80; - if (listener instanceof Schedulable) - ((Schedulable)listener).schedule(scheduler); + for (Response.ResponseListener listener : listeners) + if (listener instanceof Schedulable) + ((Schedulable)listener).schedule(scheduler); HttpDestination destination = provideDestination(scheme, request.getHost(), port); - destination.send(request, listener); + destination.send(request, listeners); } protected void newConnection(HttpDestination destination, Callback callback) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java index ee58c05f9de..aae92bcac98 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.client; import java.io.UnsupportedEncodingException; import java.net.URLEncoder; import java.nio.charset.UnsupportedCharsetException; +import java.util.Collections; import java.util.Enumeration; import java.util.Iterator; import java.util.List; @@ -102,7 +103,12 @@ public class HttpConnection extends AbstractConnection implements Connection } @Override - public void send(Request request, Response.Listener listener) + public void send(Request request, Response.CompleteListener listener) + { + send(request, Collections.singletonList(listener)); + } + + public void send(Request request, List listeners) { normalizeRequest(request); @@ -112,12 +118,13 @@ public class HttpConnection extends AbstractConnection implements Connection endPoint.setIdleTimeout(request.getIdleTimeout()); HttpConversation conversation = client.getConversation(request.getConversationID(), true); - HttpExchange exchange = new HttpExchange(conversation, this, request, listener); + HttpExchange exchange = new HttpExchange(conversation, this, request, listeners); setExchange(exchange); conversation.getExchanges().offer(exchange); - if (listener instanceof Schedulable) - ((Schedulable)listener).schedule(client.getScheduler()); + for (Response.ResponseListener listener : listeners) + if (listener instanceof Schedulable) + ((Schedulable)listener).schedule(client.getScheduler()); sender.send(exchange); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContentResponse.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContentResponse.java index 97aad12d9ad..aa9dd777eab 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContentResponse.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpContentResponse.java @@ -20,6 +20,7 @@ package org.eclipse.jetty.client; import java.io.UnsupportedEncodingException; import java.nio.charset.UnsupportedCharsetException; +import java.util.List; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Response; @@ -46,9 +47,9 @@ public class HttpContentResponse implements ContentResponse } @Override - public Listener getListener() + public List getListeners(Class listenerClass) { - return response.getListener(); + return response.getListeners(listenerClass); } @Override diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConversation.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConversation.java index abaa3030b32..bacb391e20e 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConversation.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConversation.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.client; import java.util.Collections; import java.util.Deque; import java.util.Enumeration; +import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedDeque; @@ -34,7 +35,7 @@ public class HttpConversation implements Attributes private final Deque exchanges = new ConcurrentLinkedDeque<>(); private final HttpClient client; private final long id; - private volatile Response.Listener listener; + private volatile List listeners; public HttpConversation(HttpClient client, long id) { @@ -52,14 +53,14 @@ public class HttpConversation implements Attributes return exchanges; } - public Response.Listener getResponseListener() + public List getResponseListeners() { - return listener; + return listeners; } - public void setResponseListener(Response.Listener listener) + public void setResponseListeners(List listeners) { - this.listener = listener; + this.listeners = listeners; } public void complete() diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java index a9a66d2b657..4145d1d6bf2 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpDestination.java @@ -50,7 +50,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable private final String scheme; private final String host; private final int port; - private final Queue requests; + private final Queue requests; private final BlockingQueue idleConnections; private final BlockingQueue activeConnections; private final RequestNotifier requestNotifier; @@ -97,7 +97,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable return port; } - public void send(Request request, Response.Listener listener) + public void send(Request request, List listeners) { if (!scheme.equals(request.getScheme())) throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this); @@ -107,12 +107,12 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable if (port >= 0 && this.port != port) throw new IllegalArgumentException("Invalid request port " + port + " for destination " + this); - RequestPair requestPair = new RequestPair(request, listener); + RequestContext requestContext = new RequestContext(request, listeners); if (client.isRunning()) { - if (requests.offer(requestPair)) + if (requests.offer(requestContext)) { - if (!client.isRunning() && requests.remove(requestPair)) + if (!client.isRunning() && requests.remove(requestContext)) { throw new RejectedExecutionException(client + " is stopping"); } @@ -202,15 +202,15 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable private void drain(Throwable x) { - RequestPair pair; - while ((pair = requests.poll()) != null) + RequestContext requestContext; + while ((requestContext = requests.poll()) != null) { - Request request = pair.request; + Request request = requestContext.request; requestNotifier.notifyFailure(request, x); - Response.Listener listener = pair.listener; - HttpResponse response = new HttpResponse(request, listener); - responseNotifier.notifyFailure(listener, response, x); - responseNotifier.notifyComplete(listener, new Result(request, x, response, x)); + List listeners = requestContext.listeners; + HttpResponse response = new HttpResponse(request, listeners); + responseNotifier.notifyFailure(listeners, response, x); + responseNotifier.notifyComplete(listeners, new Result(request, x, response, x)); } } @@ -223,37 +223,40 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable * * @param connection the new connection */ - protected void process(final Connection connection, boolean dispatch) + protected void process(Connection connection, boolean dispatch) { - RequestPair requestPair = requests.poll(); - if (requestPair == null) + // Ugly cast, but lack of generic reification forces it + final HttpConnection httpConnection = (HttpConnection)connection; + + RequestContext requestContext = requests.poll(); + if (requestContext == null) { - LOG.debug("{} idle", connection); - if (!idleConnections.offer(connection)) + LOG.debug("{} idle", httpConnection); + if (!idleConnections.offer(httpConnection)) { LOG.debug("{} idle overflow"); - connection.close(); + httpConnection.close(); } if (!client.isRunning()) { LOG.debug("{} is stopping", client); - remove(connection); - connection.close(); + remove(httpConnection); + httpConnection.close(); } } else { - final Request request = requestPair.request; - final Response.Listener listener = requestPair.listener; + final Request request = requestContext.request; + final List listeners = requestContext.listeners; if (request.aborted()) { - abort(request, listener, "Aborted"); + abort(request, listeners, "Aborted"); LOG.debug("Aborted {} before processing", request); } else { - LOG.debug("{} active", connection); - if (!activeConnections.offer(connection)) + LOG.debug("{} active", httpConnection); + if (!activeConnections.offer(httpConnection)) { LOG.warn("{} active overflow"); } @@ -264,13 +267,13 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable @Override public void run() { - connection.send(request, listener); + httpConnection.send(request, listeners); } }); } else { - connection.send(request, listener); + httpConnection.send(request, listeners); } } } @@ -333,14 +336,14 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable public boolean abort(Request request, String reason) { - for (RequestPair pair : requests) + for (RequestContext requestContext : requests) { - if (pair.request == request) + if (requestContext.request == request) { - if (requests.remove(pair)) + if (requests.remove(requestContext)) { // We were able to remove the pair, so it won't be processed - abort(request, pair.listener, reason); + abort(request, requestContext.listeners, reason); LOG.debug("Aborted {} while queued", request); return true; } @@ -349,13 +352,13 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable return false; } - private void abort(Request request, Response.Listener listener, String reason) + private void abort(Request request, List listeners, String reason) { - HttpResponse response = new HttpResponse(request, listener); + HttpResponse response = new HttpResponse(request, listeners); HttpResponseException responseFailure = new HttpResponseException(reason, response); - responseNotifier.notifyFailure(listener, response, responseFailure); + responseNotifier.notifyFailure(listeners, response, responseFailure); HttpRequestException requestFailure = new HttpRequestException(reason, request); - responseNotifier.notifyComplete(listener, new Result(request, requestFailure, response, responseFailure)); + responseNotifier.notifyComplete(listeners, new Result(request, requestFailure, response, responseFailure)); } @Override @@ -382,15 +385,15 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable return String.format("%s(%s://%s:%d)", HttpDestination.class.getSimpleName(), getScheme(), getHost(), getPort()); } - private static class RequestPair + private static class RequestContext { private final Request request; - private final Response.Listener listener; + private final List listeners; - private RequestPair(Request request, Response.Listener listener) + private RequestContext(Request request, List listeners) { this.request = request; - this.listener = listener; + this.listeners = listeners; } } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java index 7dfff3652d7..8abcd3d2201 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java @@ -18,6 +18,7 @@ package org.eclipse.jetty.client; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicMarkableReference; @@ -37,19 +38,19 @@ public class HttpExchange private final HttpConversation conversation; private final HttpConnection connection; private final Request request; - private final Response.Listener listener; + private final List listeners; private final HttpResponse response; private volatile boolean last; private volatile Throwable requestFailure; private volatile Throwable responseFailure; - public HttpExchange(HttpConversation conversation, HttpConnection connection, Request request, Response.Listener listener) + public HttpExchange(HttpConversation conversation, HttpConnection connection, Request request, List listeners) { this.conversation = conversation; this.connection = connection; this.request = request; - this.listener = listener; - this.response = new HttpResponse(request, listener); + this.listeners = listeners; + this.response = new HttpResponse(request, listeners); } public HttpConversation getConversation() @@ -67,9 +68,9 @@ public class HttpExchange return requestFailure; } - public Response.Listener getResponseListener() + public List getResponseListeners() { - return listener; + return listeners; } public HttpResponse getResponse() @@ -179,9 +180,10 @@ public class HttpExchange if (isLast()) { HttpExchange first = conversation.getExchanges().peekFirst(); - Response.Listener listener = first.getResponseListener(); - if (listener instanceof Schedulable) - ((Schedulable)listener).cancel(); + List listeners = first.getResponseListeners(); + for (Response.ResponseListener listener : listeners) + if (listener instanceof Schedulable) + ((Schedulable)listener).cancel(); conversation.complete(); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index 2ae781d525b..b922ef24fa4 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -20,6 +20,8 @@ package org.eclipse.jetty.client; import java.io.EOFException; import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Collections; import java.util.Enumeration; import java.util.List; import java.util.concurrent.TimeoutException; @@ -139,30 +141,41 @@ public class HttpReceiver implements HttpParser.ResponseHandler response.version(version).status(status).reason(reason); // Probe the protocol handlers - Response.Listener currentListener = exchange.getResponseListener(); - Response.Listener initialListener = conversation.getExchanges().peekFirst().getResponseListener(); + HttpExchange initialExchange = conversation.getExchanges().peekFirst(); HttpClient client = connection.getHttpClient(); ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response); Response.Listener handlerListener = protocolHandler == null ? null : protocolHandler.getResponseListener(); if (handlerListener == null) { exchange.setLast(true); - if (currentListener == initialListener) - conversation.setResponseListener(initialListener); + if (initialExchange == exchange) + { + conversation.setResponseListeners(exchange.getResponseListeners()); + } else - conversation.setResponseListener(new DoubleResponseListener(responseNotifier, currentListener, initialListener)); + { + List listeners = new ArrayList<>(exchange.getResponseListeners()); + listeners.addAll(initialExchange.getResponseListeners()); + conversation.setResponseListeners(listeners); + } } else { LOG.debug("Found protocol handler {}", protocolHandler); - if (currentListener == initialListener) - conversation.setResponseListener(handlerListener); + if (initialExchange == exchange) + { + conversation.setResponseListeners(Collections.singletonList(handlerListener)); + } else - conversation.setResponseListener(new DoubleResponseListener(responseNotifier, currentListener, handlerListener)); + { + List listeners = new ArrayList<>(exchange.getResponseListeners()); + listeners.add(handlerListener); + conversation.setResponseListeners(listeners); + } } LOG.debug("Receiving {}", response); - responseNotifier.notifyBegin(conversation.getResponseListener(), response); + responseNotifier.notifyBegin(conversation.getResponseListeners(), response); } } return false; @@ -212,7 +225,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler HttpConversation conversation = exchange.getConversation(); HttpResponse response = exchange.getResponse(); LOG.debug("Headers {}", response); - responseNotifier.notifyHeaders(conversation.getResponseListener(), response); + responseNotifier.notifyHeaders(conversation.getResponseListeners(), response); Enumeration contentEncodings = response.getHeaders().getValues(HttpHeader.CONTENT_ENCODING.asString(), ","); if (contentEncodings != null) @@ -254,7 +267,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler LOG.debug("{} {}: {} bytes", decoder, response, buffer.remaining()); } - responseNotifier.notifyContent(conversation.getResponseListener(), response, buffer); + responseNotifier.notifyContent(conversation.getResponseListeners(), response, buffer); } } return false; @@ -287,8 +300,8 @@ public class HttpReceiver implements HttpParser.ResponseHandler exchange.terminateResponse(); HttpResponse response = exchange.getResponse(); - Response.Listener listener = exchange.getConversation().getResponseListener(); - responseNotifier.notifySuccess(listener, response); + List listeners = exchange.getConversation().getResponseListeners(); + responseNotifier.notifySuccess(listeners, response); LOG.debug("Received {}", response); Result result = completion.getReference(); @@ -296,7 +309,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler { connection.complete(exchange, !result.isFailed()); - responseNotifier.notifyComplete(listener, result); + responseNotifier.notifyComplete(listeners, result); } return true; @@ -330,7 +343,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler HttpResponse response = exchange.getResponse(); HttpConversation conversation = exchange.getConversation(); - responseNotifier.notifyFailure(conversation.getResponseListener(), response, failure); + responseNotifier.notifyFailure(conversation.getResponseListeners(), response, failure); LOG.debug("Failed {} {}", response, failure); Result result = completion.getReference(); @@ -338,7 +351,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler { connection.complete(exchange, false); - responseNotifier.notifyComplete(conversation.getResponseListener(), result); + responseNotifier.notifyComplete(conversation.getResponseListeners(), result); } return true; diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java index 1dcb21d5b82..7dffe62ac6e 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpRequest.java @@ -50,7 +50,8 @@ public class HttpRequest implements Request private final HttpFields headers = new HttpFields(); private final Fields params = new Fields(); private final Map attributes = new HashMap<>(); - private final List listeners = new ArrayList<>(); + private final List requestListeners = new ArrayList<>(); + private final List responseListeners = new ArrayList<>(); private final HttpClient client; private final long conversation; private final String host; @@ -240,10 +241,10 @@ public class HttpRequest implements Request } @Override - public List getListeners(Class type) + public List getRequestListeners(Class type) { ArrayList result = new ArrayList<>(); - for (RequestListener listener : listeners) + for (RequestListener listener : requestListeners) if (type == null || type.isInstance(listener)) result.add((T)listener); return result; @@ -252,42 +253,77 @@ public class HttpRequest implements Request @Override public Request listener(Request.Listener listener) { - this.listeners.add(listener); + this.requestListeners.add(listener); return this; } @Override public Request onRequestQueued(QueuedListener listener) { - this.listeners.add(listener); + this.requestListeners.add(listener); return this; } @Override public Request onRequestBegin(BeginListener listener) { - this.listeners.add(listener); + this.requestListeners.add(listener); return this; } @Override public Request onRequestHeaders(HeadersListener listener) { - this.listeners.add(listener); + this.requestListeners.add(listener); return this; } @Override public Request onRequestSuccess(SuccessListener listener) { - this.listeners.add(listener); + this.requestListeners.add(listener); return this; } @Override public Request onRequestFailure(FailureListener listener) { - this.listeners.add(listener); + this.requestListeners.add(listener); + return this; + } + + @Override + public Request onResponseBegin(Response.BeginListener listener) + { + this.responseListeners.add(listener); + return this; + } + + @Override + public Request onResponseHeaders(Response.HeadersListener listener) + { + this.responseListeners.add(listener); + return this; + } + + @Override + public Request onResponseContent(Response.ContentListener listener) + { + this.responseListeners.add(listener); + return this; + } + + @Override + public Request onResponseSuccess(Response.SuccessListener listener) + { + this.responseListeners.add(listener); + return this; + } + + @Override + public Request onResponseFailure(Response.FailureListener listener) + { + this.responseListeners.add(listener); return this; } @@ -359,9 +395,11 @@ public class HttpRequest implements Request } @Override - public void send(final Response.Listener listener) + public void send(Response.CompleteListener listener) { - client.send(this, listener); + if (listener != null) + responseListeners.add(listener); + client.send(this, responseListeners); } @Override diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpResponse.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpResponse.java index 62d1f7a845c..0b49f3be92e 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpResponse.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpResponse.java @@ -18,6 +18,9 @@ package org.eclipse.jetty.client; +import java.util.ArrayList; +import java.util.List; + import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.http.HttpFields; @@ -27,15 +30,15 @@ public class HttpResponse implements Response { private final HttpFields headers = new HttpFields(); private final Request request; - private final Listener listener; + private final List listeners; private HttpVersion version; private int status; private String reason; - public HttpResponse(Request request, Listener listener) + public HttpResponse(Request request, List listeners) { this.request = request; - this.listener = listener; + this.listeners = listeners; } public HttpVersion getVersion() @@ -85,9 +88,13 @@ public class HttpResponse implements Response } @Override - public Listener getListener() + public List getListeners(Class type) { - return listener; + ArrayList result = new ArrayList<>(); + for (ResponseListener listener : listeners) + if (type == null || type.isInstance(listener)) + result.add((T)listener); + return result; } @Override diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index 2966cc6aeda..bd38c9119f6 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -19,8 +19,10 @@ package org.eclipse.jetty.client; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; +import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicMarkableReference; @@ -67,12 +69,17 @@ public class HttpSender // Arrange the listeners, so that if there is a request failure the proper listeners are notified HttpConversation conversation = exchange.getConversation(); - Response.Listener currentListener = exchange.getResponseListener(); - Response.Listener initialListener = conversation.getExchanges().peekFirst().getResponseListener(); - if (initialListener == currentListener) - conversation.setResponseListener(initialListener); + HttpExchange initialExchange = conversation.getExchanges().peekFirst(); + if (initialExchange == exchange) + { + conversation.setResponseListeners(exchange.getResponseListeners()); + } else - conversation.setResponseListener(new DoubleResponseListener(responseNotifier, currentListener, initialListener)); + { + List listeners = new ArrayList<>(exchange.getResponseListeners()); + listeners.addAll(initialExchange.getResponseListeners()); + conversation.setResponseListeners(listeners); + } Request request = exchange.getRequest(); if (request.aborted()) @@ -359,7 +366,7 @@ public class HttpSender connection.complete(exchange, !result.isFailed()); HttpConversation conversation = exchange.getConversation(); - responseNotifier.notifyComplete(conversation.getResponseListener(), result); + responseNotifier.notifyComplete(conversation.getResponseListeners(), result); } return true; @@ -405,7 +412,7 @@ public class HttpSender connection.complete(exchange, false); HttpConversation conversation = exchange.getConversation(); - responseNotifier.notifyComplete(conversation.getResponseListener(), result); + responseNotifier.notifyComplete(conversation.getResponseListeners(), result); } return true; diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java index b1af9582fc7..82895958ae6 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RedirectProtocolHandler.java @@ -18,10 +18,11 @@ package org.eclipse.jetty.client; +import java.util.List; + import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; -import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpMethod; public class RedirectProtocolHandler extends Response.Listener.Empty implements ProtocolHandler @@ -115,20 +116,11 @@ public class RedirectProtocolHandler extends Response.Listener.Empty implements ++redirects; conversation.setAttribute(ATTRIBUTE, redirects); - Request redirect = client.newRequest(request.getConversationID(), location); + Request redirect = client.copyRequest(request, location); // Use given method redirect.method(method); - redirect.version(request.getVersion()); - - // Copy headers - for (HttpFields.Field header : request.getHeaders()) - redirect.header(header.getName(), header.getValue()); - - // Copy content - redirect.content(request.getContent()); - redirect.onRequestBegin(new Request.BeginListener() { @Override @@ -139,7 +131,7 @@ public class RedirectProtocolHandler extends Response.Listener.Empty implements } }); - redirect.send(new Response.Listener.Empty()); + redirect.send(null); } else { @@ -152,9 +144,9 @@ public class RedirectProtocolHandler extends Response.Listener.Empty implements Request request = result.getRequest(); Response response = result.getResponse(); HttpConversation conversation = client.getConversation(request.getConversationID(), false); - Response.Listener listener = conversation.getExchanges().peekFirst().getResponseListener(); + List listeners = conversation.getExchanges().peekFirst().getResponseListeners(); // TODO: should we replay all events, or just the failure ? - notifier.notifyFailure(listener, response, failure); - notifier.notifyComplete(listener, new Result(request, response, failure)); + notifier.notifyFailure(listeners, response, failure); + notifier.notifyComplete(listeners, new Result(request, response, failure)); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RequestNotifier.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RequestNotifier.java index 05ac2e642c6..b04949b4852 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RequestNotifier.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RequestNotifier.java @@ -35,7 +35,7 @@ public class RequestNotifier public void notifyQueued(Request request) { - for (Request.QueuedListener listener : request.getListeners(Request.QueuedListener.class)) + for (Request.QueuedListener listener : request.getRequestListeners(Request.QueuedListener.class)) notifyQueued(listener, request); for (Request.Listener listener : client.getRequestListeners()) notifyQueued(listener, request); @@ -56,7 +56,7 @@ public class RequestNotifier public void notifyBegin(Request request) { - for (Request.BeginListener listener : request.getListeners(Request.BeginListener.class)) + for (Request.BeginListener listener : request.getRequestListeners(Request.BeginListener.class)) notifyBegin(listener, request); for (Request.Listener listener : client.getRequestListeners()) notifyBegin(listener, request); @@ -77,7 +77,7 @@ public class RequestNotifier public void notifyHeaders(Request request) { - for (Request.HeadersListener listener : request.getListeners(Request.HeadersListener.class)) + for (Request.HeadersListener listener : request.getRequestListeners(Request.HeadersListener.class)) notifyHeaders(listener, request); for (Request.Listener listener : client.getRequestListeners()) notifyHeaders(listener, request); @@ -98,7 +98,7 @@ public class RequestNotifier public void notifySuccess(Request request) { - for (Request.SuccessListener listener : request.getListeners(Request.SuccessListener.class)) + for (Request.SuccessListener listener : request.getRequestListeners(Request.SuccessListener.class)) notifySuccess(listener, request); for (Request.Listener listener : client.getRequestListeners()) notifySuccess(listener, request); @@ -119,7 +119,7 @@ public class RequestNotifier public void notifyFailure(Request request, Throwable failure) { - for (Request.FailureListener listener : request.getListeners(Request.FailureListener.class)) + for (Request.FailureListener listener : request.getRequestListeners(Request.FailureListener.class)) notifyFailure(listener, request, failure); for (Request.Listener listener : client.getRequestListeners()) notifyFailure(listener, request, failure); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseNotifier.java b/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseNotifier.java index d56a5b36693..a1b64c6d2ee 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseNotifier.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/ResponseNotifier.java @@ -19,6 +19,7 @@ package org.eclipse.jetty.client; import java.nio.ByteBuffer; +import java.util.List; import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.Request; @@ -37,12 +38,18 @@ public class ResponseNotifier this.client = client; } - public void notifyBegin(Response.Listener listener, Response response) + public void notifyBegin(List listeners, Response response) + { + for (Response.ResponseListener listener : listeners) + if (listener instanceof Response.BeginListener) + notifyBegin((Response.BeginListener)listener, response); + } + + private void notifyBegin(Response.BeginListener listener, Response response) { try { - if (listener != null) - listener.onBegin(response); + listener.onBegin(response); } catch (Exception x) { @@ -50,12 +57,18 @@ public class ResponseNotifier } } - public void notifyHeaders(Response.Listener listener, Response response) + public void notifyHeaders(List listeners, Response response) + { + for (Response.ResponseListener listener : listeners) + if (listener instanceof Response.HeadersListener) + notifyHeaders((Response.HeadersListener)listener, response); + } + + private void notifyHeaders(Response.HeadersListener listener, Response response) { try { - if (listener != null) - listener.onHeaders(response); + listener.onHeaders(response); } catch (Exception x) { @@ -63,12 +76,19 @@ public class ResponseNotifier } } - public void notifyContent(Response.Listener listener, Response response, ByteBuffer buffer) + public void notifyContent(List listeners, Response response, ByteBuffer buffer) + { + for (Response.ResponseListener listener : listeners) + if (listener instanceof Response.ContentListener) + notifyContent((Response.ContentListener)listener, response, buffer); + + } + + private void notifyContent(Response.ContentListener listener, Response response, ByteBuffer buffer) { try { - if (listener != null) - listener.onContent(response, buffer); + listener.onContent(response, buffer); } catch (Exception x) { @@ -76,12 +96,18 @@ public class ResponseNotifier } } - public void notifySuccess(Response.Listener listener, Response response) + public void notifySuccess(List listeners, Response response) + { + for (Response.ResponseListener listener : listeners) + if (listener instanceof Response.SuccessListener) + notifySuccess((Response.SuccessListener)listener, response); + } + + private void notifySuccess(Response.SuccessListener listener, Response response) { try { - if (listener != null) - listener.onSuccess(response); + listener.onSuccess(response); } catch (Exception x) { @@ -89,12 +115,18 @@ public class ResponseNotifier } } - public void notifyFailure(Response.Listener listener, Response response, Throwable failure) + public void notifyFailure(List listeners, Response response, Throwable failure) + { + for (Response.ResponseListener listener : listeners) + if (listener instanceof Response.FailureListener) + notifyFailure((Response.FailureListener)listener, response, failure); + } + + private void notifyFailure(Response.FailureListener listener, Response response, Throwable failure) { try { - if (listener != null) - listener.onFailure(response, failure); + listener.onFailure(response, failure); } catch (Exception x) { @@ -102,12 +134,18 @@ public class ResponseNotifier } } - public void notifyComplete(Response.Listener listener, Result result) + public void notifyComplete(List listeners, Result result) + { + for (Response.ResponseListener listener : listeners) + if (listener instanceof Response.CompleteListener) + notifyComplete((Response.CompleteListener)listener, result); + } + + private void notifyComplete(Response.CompleteListener listener, Result result) { try { - if (listener != null) - listener.onComplete(result); + listener.onComplete(result); } catch (Exception x) { @@ -115,37 +153,37 @@ public class ResponseNotifier } } - public void forwardSuccess(Response.Listener listener, Response response) + public void forwardSuccess(List listeners, Response response) { - notifyBegin(listener, response); - notifyHeaders(listener, response); + notifyBegin(listeners, response); + notifyHeaders(listeners, response); if (response instanceof ContentResponse) - notifyContent(listener, response, ByteBuffer.wrap(((ContentResponse)response).getContent())); - notifySuccess(listener, response); + notifyContent(listeners, response, ByteBuffer.wrap(((ContentResponse)response).getContent())); + notifySuccess(listeners, response); } - public void forwardSuccessComplete(Response.Listener listener, Request request, Response response) + public void forwardSuccessComplete(List listeners, Request request, Response response) { HttpConversation conversation = client.getConversation(request.getConversationID(), false); - forwardSuccess(listener, response); + forwardSuccess(listeners, response); conversation.complete(); - notifyComplete(listener, new Result(request, response)); + notifyComplete(listeners, new Result(request, response)); } - public void forwardFailure(Response.Listener listener, Response response, Throwable failure) + public void forwardFailure(List listeners, Response response, Throwable failure) { - notifyBegin(listener, response); - notifyHeaders(listener, response); + notifyBegin(listeners, response); + notifyHeaders(listeners, response); if (response instanceof ContentResponse) - notifyContent(listener, response, ByteBuffer.wrap(((ContentResponse)response).getContent())); - notifyFailure(listener, response, failure); + notifyContent(listeners, response, ByteBuffer.wrap(((ContentResponse)response).getContent())); + notifyFailure(listeners, response, failure); } - public void forwardFailureComplete(Response.Listener listener, Request request, Throwable requestFailure, Response response, Throwable responseFailure) + public void forwardFailureComplete(List listeners, Request request, Throwable requestFailure, Response response, Throwable responseFailure) { HttpConversation conversation = client.getConversation(request.getConversationID(), false); - forwardFailure(listener, response, responseFailure); + forwardFailure(listeners, response, responseFailure); conversation.complete(); - notifyComplete(listener, new Result(request, requestFailure, response, responseFailure)); + notifyComplete(listeners, new Result(request, requestFailure, response, responseFailure)); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Connection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Connection.java index 23d963ca5c8..64b719447e2 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Connection.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Connection.java @@ -37,7 +37,7 @@ public interface Connection extends AutoCloseable * @param request the request to send * @param listener the response listener */ - void send(Request request, Response.Listener listener); + void send(Request request, Response.CompleteListener listener); @Override void close(); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java index 6b271c6856e..681bfe81423 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Request.java @@ -210,44 +210,74 @@ public interface Request * @param listenerClass the class of the listener, or null for all listeners classes * @return the listeners for request events of the given class */ - List getListeners(Class listenerClass); + List getRequestListeners(Class listenerClass); /** - * @param listener the listener for request events + * @param listener a listener for request events * @return this request object */ Request listener(Listener listener); /** - * @param listener the listener for request queued events + * @param listener a listener for request queued event * @return this request object */ Request onRequestQueued(QueuedListener listener); /** - * @param listener the listener for request begin events + * @param listener a listener for request begin event * @return this request object */ Request onRequestBegin(BeginListener listener); /** - * @param listener the listener for request headers events + * @param listener a listener for request headers event * @return this request object */ Request onRequestHeaders(HeadersListener listener); /** - * @param listener the listener for request headers events + * @param listener a listener for request success event * @return this request object */ Request onRequestSuccess(SuccessListener listener); /** - * @param listener the listener for request headers events + * @param listener a listener for request failure event * @return this request object */ Request onRequestFailure(FailureListener listener); + /** + * @param listener a listener for response begin event + * @return this request object + */ + Request onResponseBegin(Response.BeginListener listener); + + /** + * @param listener a listener for response headers event + * @return this request object + */ + Request onResponseHeaders(Response.HeadersListener listener); + + /** + * @param listener a listener for response content events + * @return this request object + */ + Request onResponseContent(Response.ContentListener listener); + + /** + * @param listener a listener for response success event + * @return this request object + */ + Request onResponseSuccess(Response.SuccessListener listener); + + /** + * @param listener a listener for response failure event + * @return this request object + */ + Request onResponseFailure(Response.FailureListener listener); + /** * Sends this request and returns a {@link Future} that can be used to wait for the * request and the response to be completed (either with a success or a failure). @@ -273,7 +303,7 @@ public interface Request * * @param listener the listener that receives response events */ - void send(Response.Listener listener); + void send(Response.CompleteListener listener); /** * Attempts to abort the send of this request. diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Response.java b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Response.java index 72c97c7c741..951d4853f66 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/api/Response.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/api/Response.java @@ -19,6 +19,8 @@ package org.eclipse.jetty.client.api; import java.nio.ByteBuffer; +import java.util.EventListener; +import java.util.List; import org.eclipse.jetty.client.util.BufferingResponseListener; import org.eclipse.jetty.http.HttpFields; @@ -42,9 +44,9 @@ public interface Response long getConversationID(); /** - * @return the response listener passed to {@link Request#send(Listener)} + * @return the response listener passed to {@link Request#send(CompleteListener)} */ - Listener getListener(); + List getListeners(Class listenerClass); /** * @return the HTTP version of this response, such as "HTTP/1.1" @@ -74,10 +76,11 @@ public interface Response */ boolean abort(String reason); - /** - * Listener for response events - */ - public interface Listener + public interface ResponseListener extends EventListener + { + } + + public interface BeginListener extends ResponseListener { /** * Callback method invoked when the response line containing HTTP version, @@ -88,14 +91,20 @@ public interface Response * @param response the response containing the response line data */ public void onBegin(Response response); + } + public interface HeadersListener extends ResponseListener + { /** * Callback method invoked when the response headers have been received and parsed. * * @param response the response containing the response line data and the headers */ public void onHeaders(Response response); + } + public interface ContentListener extends ResponseListener + { /** * Callback method invoked when the response content has been received. * This method may be invoked multiple times, and the {@code content} buffer must be consumed @@ -105,14 +114,20 @@ public interface Response * @param content the content bytes received */ public void onContent(Response response, ByteBuffer content); + } + public interface SuccessListener extends ResponseListener + { /** * Callback method invoked when the whole response has been successfully received. * * @param response the response containing the response line data and the headers */ public void onSuccess(Response response); + } + public interface FailureListener extends ResponseListener + { /** * Callback method invoked when the response has failed in the process of being received * @@ -120,7 +135,10 @@ public interface Response * @param failure the failure happened */ public void onFailure(Response response, Throwable failure); + } + public interface CompleteListener extends ResponseListener + { /** * Callback method invoked when the request and the response have been processed, * either successfully or not. @@ -129,13 +147,20 @@ public interface Response *

* Requests may complete after response, for example in case of big uploads that are * discarded or read asynchronously by the server. - * This method is always invoked after {@link #onSuccess(Response)} or - * {@link #onFailure(Response, Throwable)}, and only when request indicates that it is completed. + * This method is always invoked after {@link SuccessListener#onSuccess(Response)} or + * {@link FailureListener#onFailure(Response, Throwable)}, and only when request indicates that + * it is completed. * * @param result the result of the request / response exchange */ public void onComplete(Result result); + } + /** + * Listener for response events + */ + public interface Listener extends BeginListener, HeadersListener, ContentListener, SuccessListener, FailureListener, CompleteListener + { /** * An empty implementation of {@link Listener} */ diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/BlockingResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/BlockingResponseListener.java index 12ec25d6a68..bef0fbf305e 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/BlockingResponseListener.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/BlockingResponseListener.java @@ -46,7 +46,6 @@ public class BlockingResponseListener extends BufferingResponseListener implemen @Override public void onComplete(Result result) { - super.onComplete(result); response = new HttpContentResponse(result.getResponse(), getContent(), getEncoding()); failure = result.getFailure(); latch.countDown(); diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/util/BufferingResponseListener.java b/jetty-client/src/main/java/org/eclipse/jetty/client/util/BufferingResponseListener.java index 1c4a7aa5e49..1eba12ed74e 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/util/BufferingResponseListener.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/util/BufferingResponseListener.java @@ -33,7 +33,7 @@ import org.eclipse.jetty.http.HttpHeader; *

The content may be retrieved from {@link #onSuccess(Response)} or {@link #onComplete(Result)} * via {@link #getContent()} or {@link #getContentAsString()}.

*/ -public class BufferingResponseListener extends Response.Listener.Empty +public abstract class BufferingResponseListener extends Response.Listener.Empty { private final int maxLength; private volatile byte[] buffer = new byte[0]; @@ -95,6 +95,9 @@ public class BufferingResponseListener extends Response.Listener.Empty buffer = newBuffer; } + @Override + public abstract void onComplete(Result result); + public String getEncoding() { return encoding; diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAuthenticationTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAuthenticationTest.java index 4358138efc5..ac4eac505ac 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAuthenticationTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientAuthenticationTest.java @@ -99,14 +99,16 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest public void test_BasicAuthentication() throws Exception { startBasic(new EmptyServerHandler()); - test_Authentication(new BasicAuthentication(scheme + "://localhost:" + connector.getLocalPort(), realm, "basic", "basic")); + String uri = scheme + "://localhost:" + connector.getLocalPort(); + test_Authentication(new BasicAuthentication(uri, realm, "basic", "basic")); } @Test public void test_DigestAuthentication() throws Exception { startDigest(new EmptyServerHandler()); - test_Authentication(new DigestAuthentication(scheme + "://localhost:" + connector.getLocalPort(), realm, "digest", "digest")); + String uri = scheme + "://localhost:" + connector.getLocalPort(); + test_Authentication(new DigestAuthentication(uri, realm, "digest", "digest")); } private void test_Authentication(Authentication authentication) throws Exception @@ -189,7 +191,8 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest } }); - client.getAuthenticationStore().addAuthentication(new BasicAuthentication(scheme + "://localhost:" + connector.getLocalPort(), realm, "basic", "basic")); + String uri = scheme + "://localhost:" + connector.getLocalPort(); + client.getAuthenticationStore().addAuthentication(new BasicAuthentication(uri, realm, "basic", "basic")); final CountDownLatch requests = new CountDownLatch(3); Request.Listener.Empty requestListener = new Request.Listener.Empty() @@ -227,7 +230,8 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest } }); - client.getAuthenticationStore().addAuthentication(new BasicAuthentication(scheme + "://localhost:" + connector.getLocalPort(), realm, "basic", "basic")); + String uri = scheme + "://localhost:" + connector.getLocalPort(); + client.getAuthenticationStore().addAuthentication(new BasicAuthentication(uri, realm, "basic", "basic")); final CountDownLatch requests = new CountDownLatch(3); Request.Listener.Empty requestListener = new Request.Listener.Empty() @@ -268,7 +272,8 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest client.getRequestListeners().add(requestListener); AuthenticationStore authenticationStore = client.getAuthenticationStore(); - BasicAuthentication authentication = new BasicAuthentication(scheme + "://localhost:" + connector.getLocalPort(), realm, "basic", "basic"); + String uri = scheme + "://localhost:" + connector.getLocalPort(); + BasicAuthentication authentication = new BasicAuthentication(uri, realm, "basic", "basic"); authenticationStore.addAuthentication(authentication); Request request = client.newRequest("localhost", connector.getLocalPort()).scheme(scheme).path("/secure"); diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientLoadTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientLoadTest.java index 5604bc53595..54cc78811de 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientLoadTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientLoadTest.java @@ -67,7 +67,7 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest client.setMaxQueueSizePerAddress(1024 * 1024); client.setDispatchIO(false); - Random random = new Random(1000L); + Random random = new Random(); int iterations = 500; CountDownLatch latch = new CountDownLatch(iterations); List failures = new ArrayList<>(); @@ -132,15 +132,15 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest else if (!ssl && random.nextBoolean()) request.header("X-Close", "true"); + int contentLength = random.nextInt(maxContentLength) + 1; switch (method) { case GET: // Randomly ask the server to download data upon this GET request if (random.nextBoolean()) - request.header("X-Download", String.valueOf(random.nextInt(maxContentLength) + 1)); + request.header("X-Download", String.valueOf(contentLength)); break; case POST: - int contentLength = random.nextInt(maxContentLength) + 1; request.header("X-Upload", String.valueOf(contentLength)); request.content(new BytesContentProvider(new byte[contentLength])); break; diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java index e525e4a6084..5ffc74a0679 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpClientTest.java @@ -371,18 +371,19 @@ public class HttpClientTest extends AbstractHttpClientServerTest latch.countDown(); } }) - .send(new Response.Listener.Empty() + .onResponseFailure(new Response.FailureListener() { @Override public void onFailure(Response response, Throwable failure) { latch.countDown(); } - }); + }) + .send(null); client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) - .send(new Response.Listener.Empty() + .onResponseSuccess(new Response.SuccessListener() { @Override public void onSuccess(Response response) @@ -390,7 +391,8 @@ public class HttpClientTest extends AbstractHttpClientServerTest Assert.assertEquals(200, response.getStatus()); latch.countDown(); } - }); + }) + .send(null); Assert.assertTrue(latch.await(5 * idleTimeout, TimeUnit.MILLISECONDS)); } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java index 2d08bb590d0..b0aa3b6135e 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpConnectionLifecycleTest.java @@ -76,7 +76,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest successLatch.countDown(); } }) - .send(new Response.Listener.Empty() + .onResponseHeaders(new Response.HeadersListener() { @Override public void onHeaders(Response response) @@ -85,7 +85,9 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest Assert.assertEquals(1, activeConnections.size()); headersLatch.countDown(); } - + }) + .send(new Response.Listener.Empty() + { @Override public void onSuccess(Response response) { diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpReceiverTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpReceiverTest.java index 6c9487780aa..aa7948e925c 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpReceiverTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpReceiverTest.java @@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream; import java.io.EOFException; import java.net.URI; import java.nio.ByteBuffer; +import java.util.Collections; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -73,7 +74,7 @@ public class HttpReceiverTest { HttpRequest request = new HttpRequest(client, URI.create("http://localhost")); BlockingResponseListener listener = new BlockingResponseListener(request); - HttpExchange exchange = new HttpExchange(conversation, connection, request, listener); + HttpExchange exchange = new HttpExchange(conversation, connection, request, Collections.singletonList(listener)); conversation.getExchanges().offer(exchange); connection.setExchange(exchange); exchange.requestComplete(null); @@ -89,7 +90,7 @@ public class HttpReceiverTest "Content-length: 0\r\n" + "\r\n"); HttpExchange exchange = newExchange(); - BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListener(); + BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListeners().get(0); exchange.receive(); Response response = listener.get(5, TimeUnit.SECONDS); @@ -113,7 +114,7 @@ public class HttpReceiverTest "\r\n" + content); HttpExchange exchange = newExchange(); - BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListener(); + BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListeners().get(0); exchange.receive(); Response response = listener.get(5, TimeUnit.SECONDS); @@ -140,7 +141,7 @@ public class HttpReceiverTest "\r\n" + content1); HttpExchange exchange = newExchange(); - BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListener(); + BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListeners().get(0); exchange.receive(); endPoint.setInputEOF(); exchange.receive(); @@ -164,7 +165,7 @@ public class HttpReceiverTest "Content-length: 1\r\n" + "\r\n"); HttpExchange exchange = newExchange(); - BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListener(); + BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListeners().get(0); exchange.receive(); // Simulate an idle timeout connection.idleTimeout(); @@ -188,7 +189,7 @@ public class HttpReceiverTest "Content-length: A\r\n" + "\r\n"); HttpExchange exchange = newExchange(); - BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListener(); + BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListeners().get(0); exchange.receive(); try @@ -219,7 +220,7 @@ public class HttpReceiverTest "Content-Encoding: gzip\r\n" + "\r\n"); HttpExchange exchange = newExchange(); - BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListener(); + BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListeners().get(0); exchange.receive(); endPoint.reset(); diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java index 4b16b688bc5..74efb0cd14c 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java @@ -55,14 +55,16 @@ public class HttpResponseAbortTest extends AbstractHttpClientServerTest final CountDownLatch latch = new CountDownLatch(1); client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) - .send(new Response.Listener.Empty() + .onResponseBegin(new Response.BeginListener() { @Override public void onBegin(Response response) { response.abort(null); } - + }) + .send(new Response.CompleteListener() + { @Override public void onComplete(Result result) { @@ -81,13 +83,16 @@ public class HttpResponseAbortTest extends AbstractHttpClientServerTest final CountDownLatch latch = new CountDownLatch(1); client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) - .send(new Response.Listener.Empty() + .onResponseHeaders(new Response.HeadersListener() { @Override public void onHeaders(Response response) { response.abort(null); } + }) + .send(new Response.CompleteListener() + { @Override public void onComplete(Result result) @@ -126,14 +131,16 @@ public class HttpResponseAbortTest extends AbstractHttpClientServerTest final CountDownLatch latch = new CountDownLatch(1); client.newRequest("localhost", connector.getLocalPort()) .scheme(scheme) - .send(new Response.Listener.Empty() + .onResponseContent(new Response.ContentListener() { @Override public void onContent(Response response, ByteBuffer content) { response.abort(null); } - + }) + .send(new Response.CompleteListener() + { @Override public void onComplete(Result result) { diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpSenderTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpSenderTest.java index 8a640b50848..25136c830a8 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpSenderTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpSenderTest.java @@ -79,7 +79,7 @@ public class HttpSenderTest successLatch.countDown(); } }); - connection.send(request, null); + connection.send(request, (Response.CompleteListener)null); String requestString = endPoint.takeOutputString(); Assert.assertTrue(requestString.startsWith("GET ")); @@ -96,7 +96,7 @@ public class HttpSenderTest HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080); HttpConnection connection = new HttpConnection(client, endPoint, destination); Request request = client.newRequest(URI.create("http://localhost/")); - connection.send(request, null); + connection.send(request, (Response.CompleteListener)null); // This take will free space in the buffer and allow for the write to complete StringBuilder builder = new StringBuilder(endPoint.takeOutputString()); @@ -126,7 +126,7 @@ public class HttpSenderTest HttpConnection connection = new HttpConnection(client, endPoint, destination); Request request = client.newRequest(URI.create("http://localhost/")); final CountDownLatch failureLatch = new CountDownLatch(2); - request.onRequestFailure(new Request.FailureListener() + request.listener(new Request.Listener.Empty() { @Override public void onFailure(Request request, Throwable x) @@ -155,7 +155,7 @@ public class HttpSenderTest HttpConnection connection = new HttpConnection(client, endPoint, destination); Request request = client.newRequest(URI.create("http://localhost/")); final CountDownLatch failureLatch = new CountDownLatch(2); - request.onRequestFailure(new Request.FailureListener() + request.listener(new Request.Listener.Empty() { @Override public void onFailure(Request request, Throwable x) @@ -207,7 +207,7 @@ public class HttpSenderTest successLatch.countDown(); } }); - connection.send(request, null); + connection.send(request, (Response.CompleteListener)null); String requestString = endPoint.takeOutputString(); Assert.assertTrue(requestString.startsWith("GET ")); @@ -242,7 +242,7 @@ public class HttpSenderTest successLatch.countDown(); } }); - connection.send(request, null); + connection.send(request, (Response.CompleteListener)null); String requestString = endPoint.takeOutputString(); Assert.assertTrue(requestString.startsWith("GET ")); @@ -284,7 +284,7 @@ public class HttpSenderTest successLatch.countDown(); } }); - connection.send(request, null); + connection.send(request, (Response.CompleteListener)null); String requestString = endPoint.takeOutputString(); Assert.assertTrue(requestString.startsWith("GET "));