HTTP client: refactored response listeners to support lambdas.

This commit is contained in:
Simone Bordet 2012-10-30 19:21:49 +01:00
parent 8d51961516
commit 0d762bcdbc
28 changed files with 424 additions and 304 deletions

View File

@ -80,13 +80,13 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
{
Request request = result.getRequest();
HttpConversation conversation = client.getConversation(request.getConversationID(), false);
Response.Listener listener = conversation.getExchanges().peekFirst().getResponseListener();
List<Response.ResponseListener> listeners = conversation.getExchanges().peekFirst().getResponseListeners();
ContentResponse response = new HttpContentResponse(result.getResponse(), getContent(), getEncoding());
if (result.isFailed())
{
Throwable failure = result.getFailure();
LOG.debug("Authentication challenge failed {}", failure);
notifier.forwardFailureComplete(listener, request, result.getRequestFailure(), response, result.getResponseFailure());
notifier.forwardFailureComplete(listeners, request, result.getRequestFailure(), response, result.getResponseFailure());
return;
}
@ -94,7 +94,7 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
if (wwwAuthenticates.isEmpty())
{
LOG.debug("Authentication challenge without WWW-Authenticate header");
notifier.forwardFailureComplete(listener, request, null, response, new HttpResponseException("HTTP protocol violation: 401 without WWW-Authenticate header", response));
notifier.forwardFailureComplete(listeners, request, null, response, new HttpResponseException("HTTP protocol violation: 401 without WWW-Authenticate header", response));
return;
}
@ -113,7 +113,7 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
if (authentication == null)
{
LOG.debug("No authentication available for {}", request);
notifier.forwardSuccessComplete(listener, request, response);
notifier.forwardSuccessComplete(listeners, request, response);
return;
}
@ -121,19 +121,20 @@ public class AuthenticationProtocolHandler implements ProtocolHandler
LOG.debug("Authentication result {}", authnResult);
if (authnResult == null)
{
notifier.forwardSuccessComplete(listener, request, response);
notifier.forwardSuccessComplete(listeners, request, response);
return;
}
authnResult.apply(request);
request.send(new Response.Listener.Empty()
Request newRequest = client.copyRequest(request, request.getURI());
authnResult.apply(newRequest);
newRequest.onResponseSuccess(new Response.SuccessListener()
{
@Override
public void onSuccess(Response response)
{
client.getAuthenticationStore().addAuthenticationResult(authnResult);
}
});
}).send(null);
}
private List<WWWAuthenticate> parseWWWAuthenticate(Response response)

View File

@ -18,8 +18,11 @@
package org.eclipse.jetty.client;
import java.util.List;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpHeaderValue;
@ -67,14 +70,14 @@ public class ContinueProtocolHandler implements ProtocolHandler
HttpExchange exchange = conversation.getExchanges().peekLast();
assert exchange.getResponse() == response;
Response.Listener listener = exchange.getResponseListener();
List<Response.ResponseListener> listeners = exchange.getResponseListeners();
switch (response.getStatus())
{
case 100:
{
// All good, continue
exchange.resetResponse(true);
conversation.setResponseListener(listener);
conversation.setResponseListeners(listeners);
exchange.proceed(true);
break;
}
@ -82,8 +85,8 @@ public class ContinueProtocolHandler implements ProtocolHandler
{
// Server either does not support 100 Continue, or it does and wants to refuse the request content
HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getEncoding());
notifier.forwardSuccess(listener, contentResponse);
conversation.setResponseListener(listener);
notifier.forwardSuccess(listeners, contentResponse);
conversation.setResponseListeners(listeners);
exchange.proceed(false);
break;
}
@ -99,9 +102,14 @@ public class ContinueProtocolHandler implements ProtocolHandler
HttpExchange exchange = conversation.getExchanges().peekLast();
assert exchange.getResponse() == response;
Response.Listener listener = exchange.getResponseListener();
List<Response.ResponseListener> listeners = exchange.getResponseListeners();
HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getEncoding());
notifier.forwardFailureComplete(listener, exchange.getRequest(), exchange.getRequestFailure(), contentResponse, failure);
notifier.forwardFailureComplete(listeners, exchange.getRequest(), exchange.getRequestFailure(), contentResponse, failure);
}
@Override
public void onComplete(Result result)
{
}
}
}

View File

@ -1,80 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.nio.ByteBuffer;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
public class DoubleResponseListener implements Response.Listener
{
private final ResponseNotifier responseNotifier;
private final Response.Listener listener1;
private final Response.Listener listener2;
public DoubleResponseListener(ResponseNotifier responseNotifier, Response.Listener listener1, Response.Listener listener2)
{
this.responseNotifier = responseNotifier;
this.listener1 = listener1;
this.listener2 = listener2;
}
@Override
public void onBegin(Response response)
{
responseNotifier.notifyBegin(listener1, response);
responseNotifier.notifyBegin(listener2, response);
}
@Override
public void onHeaders(Response response)
{
responseNotifier.notifyHeaders(listener1, response);
responseNotifier.notifyHeaders(listener2, response);
}
@Override
public void onContent(Response response, ByteBuffer content)
{
responseNotifier.notifyContent(listener1, response, content);
responseNotifier.notifyContent(listener2, response, content);
}
@Override
public void onSuccess(Response response)
{
responseNotifier.notifySuccess(listener1, response);
responseNotifier.notifySuccess(listener2, response);
}
@Override
public void onFailure(Response response, Throwable failure)
{
responseNotifier.notifyFailure(listener1, response, failure);
responseNotifier.notifyFailure(listener2, response, failure);
}
@Override
public void onComplete(Result result)
{
responseNotifier.notifyComplete(listener1, result);
responseNotifier.notifyComplete(listener2, result);
}
}

View File

@ -45,6 +45,7 @@ import org.eclipse.jetty.client.api.CookieStore;
import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
@ -265,9 +266,15 @@ public class HttpClient extends ContainerLifeCycle
return new HttpRequest(this, uri);
}
protected Request newRequest(long id, String uri)
protected Request copyRequest(Request oldRequest, String newURI)
{
return new HttpRequest(this, id, URI.create(uri));
Request newRequest = new HttpRequest(this, oldRequest.getConversationID(), URI.create(newURI));
newRequest.method(oldRequest.getMethod())
.version(oldRequest.getVersion())
.content(oldRequest.getContent());
for (HttpFields.Field header : oldRequest.getHeaders())
newRequest.header(header.getName(), header.getValue());
return newRequest;
}
private String address(String scheme, String host, int port)
@ -307,7 +314,7 @@ public class HttpClient extends ContainerLifeCycle
return new ArrayList<Destination>(destinations.values());
}
protected void send(final Request request, Response.Listener listener)
protected void send(final Request request, List<Response.ResponseListener> listeners)
{
String scheme = request.getScheme().toLowerCase();
if (!Arrays.asList("http", "https").contains(scheme))
@ -317,11 +324,12 @@ public class HttpClient extends ContainerLifeCycle
if (port < 0)
port = "https".equals(scheme) ? 443 : 80;
if (listener instanceof Schedulable)
((Schedulable)listener).schedule(scheduler);
for (Response.ResponseListener listener : listeners)
if (listener instanceof Schedulable)
((Schedulable)listener).schedule(scheduler);
HttpDestination destination = provideDestination(scheme, request.getHost(), port);
destination.send(request, listener);
destination.send(request, listeners);
}
protected void newConnection(HttpDestination destination, Callback<Connection> callback)

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.client;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.UnsupportedCharsetException;
import java.util.Collections;
import java.util.Enumeration;
import java.util.Iterator;
import java.util.List;
@ -102,7 +103,12 @@ public class HttpConnection extends AbstractConnection implements Connection
}
@Override
public void send(Request request, Response.Listener listener)
public void send(Request request, Response.CompleteListener listener)
{
send(request, Collections.<Response.ResponseListener>singletonList(listener));
}
public void send(Request request, List<Response.ResponseListener> listeners)
{
normalizeRequest(request);
@ -112,12 +118,13 @@ public class HttpConnection extends AbstractConnection implements Connection
endPoint.setIdleTimeout(request.getIdleTimeout());
HttpConversation conversation = client.getConversation(request.getConversationID(), true);
HttpExchange exchange = new HttpExchange(conversation, this, request, listener);
HttpExchange exchange = new HttpExchange(conversation, this, request, listeners);
setExchange(exchange);
conversation.getExchanges().offer(exchange);
if (listener instanceof Schedulable)
((Schedulable)listener).schedule(client.getScheduler());
for (Response.ResponseListener listener : listeners)
if (listener instanceof Schedulable)
((Schedulable)listener).schedule(client.getScheduler());
sender.send(exchange);
}

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.client;
import java.io.UnsupportedEncodingException;
import java.nio.charset.UnsupportedCharsetException;
import java.util.List;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Response;
@ -46,9 +47,9 @@ public class HttpContentResponse implements ContentResponse
}
@Override
public Listener getListener()
public <T extends ResponseListener> List<T> getListeners(Class<T> listenerClass)
{
return response.getListener();
return response.getListeners(listenerClass);
}
@Override

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.client;
import java.util.Collections;
import java.util.Deque;
import java.util.Enumeration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentLinkedDeque;
@ -34,7 +35,7 @@ public class HttpConversation implements Attributes
private final Deque<HttpExchange> exchanges = new ConcurrentLinkedDeque<>();
private final HttpClient client;
private final long id;
private volatile Response.Listener listener;
private volatile List<Response.ResponseListener> listeners;
public HttpConversation(HttpClient client, long id)
{
@ -52,14 +53,14 @@ public class HttpConversation implements Attributes
return exchanges;
}
public Response.Listener getResponseListener()
public List<Response.ResponseListener> getResponseListeners()
{
return listener;
return listeners;
}
public void setResponseListener(Response.Listener listener)
public void setResponseListeners(List<Response.ResponseListener> listeners)
{
this.listener = listener;
this.listeners = listeners;
}
public void complete()

View File

@ -50,7 +50,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
private final String scheme;
private final String host;
private final int port;
private final Queue<RequestPair> requests;
private final Queue<RequestContext> requests;
private final BlockingQueue<Connection> idleConnections;
private final BlockingQueue<Connection> activeConnections;
private final RequestNotifier requestNotifier;
@ -97,7 +97,7 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
return port;
}
public void send(Request request, Response.Listener listener)
public void send(Request request, List<Response.ResponseListener> listeners)
{
if (!scheme.equals(request.getScheme()))
throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this);
@ -107,12 +107,12 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
if (port >= 0 && this.port != port)
throw new IllegalArgumentException("Invalid request port " + port + " for destination " + this);
RequestPair requestPair = new RequestPair(request, listener);
RequestContext requestContext = new RequestContext(request, listeners);
if (client.isRunning())
{
if (requests.offer(requestPair))
if (requests.offer(requestContext))
{
if (!client.isRunning() && requests.remove(requestPair))
if (!client.isRunning() && requests.remove(requestContext))
{
throw new RejectedExecutionException(client + " is stopping");
}
@ -202,15 +202,15 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
private void drain(Throwable x)
{
RequestPair pair;
while ((pair = requests.poll()) != null)
RequestContext requestContext;
while ((requestContext = requests.poll()) != null)
{
Request request = pair.request;
Request request = requestContext.request;
requestNotifier.notifyFailure(request, x);
Response.Listener listener = pair.listener;
HttpResponse response = new HttpResponse(request, listener);
responseNotifier.notifyFailure(listener, response, x);
responseNotifier.notifyComplete(listener, new Result(request, x, response, x));
List<Response.ResponseListener> listeners = requestContext.listeners;
HttpResponse response = new HttpResponse(request, listeners);
responseNotifier.notifyFailure(listeners, response, x);
responseNotifier.notifyComplete(listeners, new Result(request, x, response, x));
}
}
@ -223,37 +223,40 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
*
* @param connection the new connection
*/
protected void process(final Connection connection, boolean dispatch)
protected void process(Connection connection, boolean dispatch)
{
RequestPair requestPair = requests.poll();
if (requestPair == null)
// Ugly cast, but lack of generic reification forces it
final HttpConnection httpConnection = (HttpConnection)connection;
RequestContext requestContext = requests.poll();
if (requestContext == null)
{
LOG.debug("{} idle", connection);
if (!idleConnections.offer(connection))
LOG.debug("{} idle", httpConnection);
if (!idleConnections.offer(httpConnection))
{
LOG.debug("{} idle overflow");
connection.close();
httpConnection.close();
}
if (!client.isRunning())
{
LOG.debug("{} is stopping", client);
remove(connection);
connection.close();
remove(httpConnection);
httpConnection.close();
}
}
else
{
final Request request = requestPair.request;
final Response.Listener listener = requestPair.listener;
final Request request = requestContext.request;
final List<Response.ResponseListener> listeners = requestContext.listeners;
if (request.aborted())
{
abort(request, listener, "Aborted");
abort(request, listeners, "Aborted");
LOG.debug("Aborted {} before processing", request);
}
else
{
LOG.debug("{} active", connection);
if (!activeConnections.offer(connection))
LOG.debug("{} active", httpConnection);
if (!activeConnections.offer(httpConnection))
{
LOG.warn("{} active overflow");
}
@ -264,13 +267,13 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
@Override
public void run()
{
connection.send(request, listener);
httpConnection.send(request, listeners);
}
});
}
else
{
connection.send(request, listener);
httpConnection.send(request, listeners);
}
}
}
@ -333,14 +336,14 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
public boolean abort(Request request, String reason)
{
for (RequestPair pair : requests)
for (RequestContext requestContext : requests)
{
if (pair.request == request)
if (requestContext.request == request)
{
if (requests.remove(pair))
if (requests.remove(requestContext))
{
// We were able to remove the pair, so it won't be processed
abort(request, pair.listener, reason);
abort(request, requestContext.listeners, reason);
LOG.debug("Aborted {} while queued", request);
return true;
}
@ -349,13 +352,13 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
return false;
}
private void abort(Request request, Response.Listener listener, String reason)
private void abort(Request request, List<Response.ResponseListener> listeners, String reason)
{
HttpResponse response = new HttpResponse(request, listener);
HttpResponse response = new HttpResponse(request, listeners);
HttpResponseException responseFailure = new HttpResponseException(reason, response);
responseNotifier.notifyFailure(listener, response, responseFailure);
responseNotifier.notifyFailure(listeners, response, responseFailure);
HttpRequestException requestFailure = new HttpRequestException(reason, request);
responseNotifier.notifyComplete(listener, new Result(request, requestFailure, response, responseFailure));
responseNotifier.notifyComplete(listeners, new Result(request, requestFailure, response, responseFailure));
}
@Override
@ -382,15 +385,15 @@ public class HttpDestination implements Destination, AutoCloseable, Dumpable
return String.format("%s(%s://%s:%d)", HttpDestination.class.getSimpleName(), getScheme(), getHost(), getPort());
}
private static class RequestPair
private static class RequestContext
{
private final Request request;
private final Response.Listener listener;
private final List<Response.ResponseListener> listeners;
private RequestPair(Request request, Response.Listener listener)
private RequestContext(Request request, List<Response.ResponseListener> listeners)
{
this.request = request;
this.listener = listener;
this.listeners = listeners;
}
}
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.client;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicMarkableReference;
@ -37,19 +38,19 @@ public class HttpExchange
private final HttpConversation conversation;
private final HttpConnection connection;
private final Request request;
private final Response.Listener listener;
private final List<Response.ResponseListener> listeners;
private final HttpResponse response;
private volatile boolean last;
private volatile Throwable requestFailure;
private volatile Throwable responseFailure;
public HttpExchange(HttpConversation conversation, HttpConnection connection, Request request, Response.Listener listener)
public HttpExchange(HttpConversation conversation, HttpConnection connection, Request request, List<Response.ResponseListener> listeners)
{
this.conversation = conversation;
this.connection = connection;
this.request = request;
this.listener = listener;
this.response = new HttpResponse(request, listener);
this.listeners = listeners;
this.response = new HttpResponse(request, listeners);
}
public HttpConversation getConversation()
@ -67,9 +68,9 @@ public class HttpExchange
return requestFailure;
}
public Response.Listener getResponseListener()
public List<Response.ResponseListener> getResponseListeners()
{
return listener;
return listeners;
}
public HttpResponse getResponse()
@ -179,9 +180,10 @@ public class HttpExchange
if (isLast())
{
HttpExchange first = conversation.getExchanges().peekFirst();
Response.Listener listener = first.getResponseListener();
if (listener instanceof Schedulable)
((Schedulable)listener).cancel();
List<Response.ResponseListener> listeners = first.getResponseListeners();
for (Response.ResponseListener listener : listeners)
if (listener instanceof Schedulable)
((Schedulable)listener).cancel();
conversation.complete();
}
}

View File

@ -20,6 +20,8 @@ package org.eclipse.jetty.client;
import java.io.EOFException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Enumeration;
import java.util.List;
import java.util.concurrent.TimeoutException;
@ -139,30 +141,41 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
response.version(version).status(status).reason(reason);
// Probe the protocol handlers
Response.Listener currentListener = exchange.getResponseListener();
Response.Listener initialListener = conversation.getExchanges().peekFirst().getResponseListener();
HttpExchange initialExchange = conversation.getExchanges().peekFirst();
HttpClient client = connection.getHttpClient();
ProtocolHandler protocolHandler = client.findProtocolHandler(exchange.getRequest(), response);
Response.Listener handlerListener = protocolHandler == null ? null : protocolHandler.getResponseListener();
if (handlerListener == null)
{
exchange.setLast(true);
if (currentListener == initialListener)
conversation.setResponseListener(initialListener);
if (initialExchange == exchange)
{
conversation.setResponseListeners(exchange.getResponseListeners());
}
else
conversation.setResponseListener(new DoubleResponseListener(responseNotifier, currentListener, initialListener));
{
List<Response.ResponseListener> listeners = new ArrayList<>(exchange.getResponseListeners());
listeners.addAll(initialExchange.getResponseListeners());
conversation.setResponseListeners(listeners);
}
}
else
{
LOG.debug("Found protocol handler {}", protocolHandler);
if (currentListener == initialListener)
conversation.setResponseListener(handlerListener);
if (initialExchange == exchange)
{
conversation.setResponseListeners(Collections.<Response.ResponseListener>singletonList(handlerListener));
}
else
conversation.setResponseListener(new DoubleResponseListener(responseNotifier, currentListener, handlerListener));
{
List<Response.ResponseListener> listeners = new ArrayList<>(exchange.getResponseListeners());
listeners.add(handlerListener);
conversation.setResponseListeners(listeners);
}
}
LOG.debug("Receiving {}", response);
responseNotifier.notifyBegin(conversation.getResponseListener(), response);
responseNotifier.notifyBegin(conversation.getResponseListeners(), response);
}
}
return false;
@ -212,7 +225,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
HttpConversation conversation = exchange.getConversation();
HttpResponse response = exchange.getResponse();
LOG.debug("Headers {}", response);
responseNotifier.notifyHeaders(conversation.getResponseListener(), response);
responseNotifier.notifyHeaders(conversation.getResponseListeners(), response);
Enumeration<String> contentEncodings = response.getHeaders().getValues(HttpHeader.CONTENT_ENCODING.asString(), ",");
if (contentEncodings != null)
@ -254,7 +267,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
LOG.debug("{} {}: {} bytes", decoder, response, buffer.remaining());
}
responseNotifier.notifyContent(conversation.getResponseListener(), response, buffer);
responseNotifier.notifyContent(conversation.getResponseListeners(), response, buffer);
}
}
return false;
@ -287,8 +300,8 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
exchange.terminateResponse();
HttpResponse response = exchange.getResponse();
Response.Listener listener = exchange.getConversation().getResponseListener();
responseNotifier.notifySuccess(listener, response);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
responseNotifier.notifySuccess(listeners, response);
LOG.debug("Received {}", response);
Result result = completion.getReference();
@ -296,7 +309,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
{
connection.complete(exchange, !result.isFailed());
responseNotifier.notifyComplete(listener, result);
responseNotifier.notifyComplete(listeners, result);
}
return true;
@ -330,7 +343,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
HttpResponse response = exchange.getResponse();
HttpConversation conversation = exchange.getConversation();
responseNotifier.notifyFailure(conversation.getResponseListener(), response, failure);
responseNotifier.notifyFailure(conversation.getResponseListeners(), response, failure);
LOG.debug("Failed {} {}", response, failure);
Result result = completion.getReference();
@ -338,7 +351,7 @@ public class HttpReceiver implements HttpParser.ResponseHandler<ByteBuffer>
{
connection.complete(exchange, false);
responseNotifier.notifyComplete(conversation.getResponseListener(), result);
responseNotifier.notifyComplete(conversation.getResponseListeners(), result);
}
return true;

View File

@ -50,7 +50,8 @@ public class HttpRequest implements Request
private final HttpFields headers = new HttpFields();
private final Fields params = new Fields();
private final Map<String, Object> attributes = new HashMap<>();
private final List<RequestListener> listeners = new ArrayList<>();
private final List<RequestListener> requestListeners = new ArrayList<>();
private final List<Response.ResponseListener> responseListeners = new ArrayList<>();
private final HttpClient client;
private final long conversation;
private final String host;
@ -240,10 +241,10 @@ public class HttpRequest implements Request
}
@Override
public <T extends RequestListener> List<T> getListeners(Class<T> type)
public <T extends RequestListener> List<T> getRequestListeners(Class<T> type)
{
ArrayList<T> result = new ArrayList<>();
for (RequestListener listener : listeners)
for (RequestListener listener : requestListeners)
if (type == null || type.isInstance(listener))
result.add((T)listener);
return result;
@ -252,42 +253,77 @@ public class HttpRequest implements Request
@Override
public Request listener(Request.Listener listener)
{
this.listeners.add(listener);
this.requestListeners.add(listener);
return this;
}
@Override
public Request onRequestQueued(QueuedListener listener)
{
this.listeners.add(listener);
this.requestListeners.add(listener);
return this;
}
@Override
public Request onRequestBegin(BeginListener listener)
{
this.listeners.add(listener);
this.requestListeners.add(listener);
return this;
}
@Override
public Request onRequestHeaders(HeadersListener listener)
{
this.listeners.add(listener);
this.requestListeners.add(listener);
return this;
}
@Override
public Request onRequestSuccess(SuccessListener listener)
{
this.listeners.add(listener);
this.requestListeners.add(listener);
return this;
}
@Override
public Request onRequestFailure(FailureListener listener)
{
this.listeners.add(listener);
this.requestListeners.add(listener);
return this;
}
@Override
public Request onResponseBegin(Response.BeginListener listener)
{
this.responseListeners.add(listener);
return this;
}
@Override
public Request onResponseHeaders(Response.HeadersListener listener)
{
this.responseListeners.add(listener);
return this;
}
@Override
public Request onResponseContent(Response.ContentListener listener)
{
this.responseListeners.add(listener);
return this;
}
@Override
public Request onResponseSuccess(Response.SuccessListener listener)
{
this.responseListeners.add(listener);
return this;
}
@Override
public Request onResponseFailure(Response.FailureListener listener)
{
this.responseListeners.add(listener);
return this;
}
@ -359,9 +395,11 @@ public class HttpRequest implements Request
}
@Override
public void send(final Response.Listener listener)
public void send(Response.CompleteListener listener)
{
client.send(this, listener);
if (listener != null)
responseListeners.add(listener);
client.send(this, responseListeners);
}
@Override

View File

@ -18,6 +18,9 @@
package org.eclipse.jetty.client;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpFields;
@ -27,15 +30,15 @@ public class HttpResponse implements Response
{
private final HttpFields headers = new HttpFields();
private final Request request;
private final Listener listener;
private final List<ResponseListener> listeners;
private HttpVersion version;
private int status;
private String reason;
public HttpResponse(Request request, Listener listener)
public HttpResponse(Request request, List<ResponseListener> listeners)
{
this.request = request;
this.listener = listener;
this.listeners = listeners;
}
public HttpVersion getVersion()
@ -85,9 +88,13 @@ public class HttpResponse implements Response
}
@Override
public Listener getListener()
public <T extends ResponseListener> List<T> getListeners(Class<T> type)
{
return listener;
ArrayList<T> result = new ArrayList<>();
for (ResponseListener listener : listeners)
if (type == null || type.isInstance(listener))
result.add((T)listener);
return result;
}
@Override

View File

@ -19,8 +19,10 @@
package org.eclipse.jetty.client;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicMarkableReference;
@ -67,12 +69,17 @@ public class HttpSender
// Arrange the listeners, so that if there is a request failure the proper listeners are notified
HttpConversation conversation = exchange.getConversation();
Response.Listener currentListener = exchange.getResponseListener();
Response.Listener initialListener = conversation.getExchanges().peekFirst().getResponseListener();
if (initialListener == currentListener)
conversation.setResponseListener(initialListener);
HttpExchange initialExchange = conversation.getExchanges().peekFirst();
if (initialExchange == exchange)
{
conversation.setResponseListeners(exchange.getResponseListeners());
}
else
conversation.setResponseListener(new DoubleResponseListener(responseNotifier, currentListener, initialListener));
{
List<Response.ResponseListener> listeners = new ArrayList<>(exchange.getResponseListeners());
listeners.addAll(initialExchange.getResponseListeners());
conversation.setResponseListeners(listeners);
}
Request request = exchange.getRequest();
if (request.aborted())
@ -359,7 +366,7 @@ public class HttpSender
connection.complete(exchange, !result.isFailed());
HttpConversation conversation = exchange.getConversation();
responseNotifier.notifyComplete(conversation.getResponseListener(), result);
responseNotifier.notifyComplete(conversation.getResponseListeners(), result);
}
return true;
@ -405,7 +412,7 @@ public class HttpSender
connection.complete(exchange, false);
HttpConversation conversation = exchange.getConversation();
responseNotifier.notifyComplete(conversation.getResponseListener(), result);
responseNotifier.notifyComplete(conversation.getResponseListeners(), result);
}
return true;

View File

@ -18,10 +18,11 @@
package org.eclipse.jetty.client;
import java.util.List;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpMethod;
public class RedirectProtocolHandler extends Response.Listener.Empty implements ProtocolHandler
@ -115,20 +116,11 @@ public class RedirectProtocolHandler extends Response.Listener.Empty implements
++redirects;
conversation.setAttribute(ATTRIBUTE, redirects);
Request redirect = client.newRequest(request.getConversationID(), location);
Request redirect = client.copyRequest(request, location);
// Use given method
redirect.method(method);
redirect.version(request.getVersion());
// Copy headers
for (HttpFields.Field header : request.getHeaders())
redirect.header(header.getName(), header.getValue());
// Copy content
redirect.content(request.getContent());
redirect.onRequestBegin(new Request.BeginListener()
{
@Override
@ -139,7 +131,7 @@ public class RedirectProtocolHandler extends Response.Listener.Empty implements
}
});
redirect.send(new Response.Listener.Empty());
redirect.send(null);
}
else
{
@ -152,9 +144,9 @@ public class RedirectProtocolHandler extends Response.Listener.Empty implements
Request request = result.getRequest();
Response response = result.getResponse();
HttpConversation conversation = client.getConversation(request.getConversationID(), false);
Response.Listener listener = conversation.getExchanges().peekFirst().getResponseListener();
List<Response.ResponseListener> listeners = conversation.getExchanges().peekFirst().getResponseListeners();
// TODO: should we replay all events, or just the failure ?
notifier.notifyFailure(listener, response, failure);
notifier.notifyComplete(listener, new Result(request, response, failure));
notifier.notifyFailure(listeners, response, failure);
notifier.notifyComplete(listeners, new Result(request, response, failure));
}
}

View File

@ -35,7 +35,7 @@ public class RequestNotifier
public void notifyQueued(Request request)
{
for (Request.QueuedListener listener : request.getListeners(Request.QueuedListener.class))
for (Request.QueuedListener listener : request.getRequestListeners(Request.QueuedListener.class))
notifyQueued(listener, request);
for (Request.Listener listener : client.getRequestListeners())
notifyQueued(listener, request);
@ -56,7 +56,7 @@ public class RequestNotifier
public void notifyBegin(Request request)
{
for (Request.BeginListener listener : request.getListeners(Request.BeginListener.class))
for (Request.BeginListener listener : request.getRequestListeners(Request.BeginListener.class))
notifyBegin(listener, request);
for (Request.Listener listener : client.getRequestListeners())
notifyBegin(listener, request);
@ -77,7 +77,7 @@ public class RequestNotifier
public void notifyHeaders(Request request)
{
for (Request.HeadersListener listener : request.getListeners(Request.HeadersListener.class))
for (Request.HeadersListener listener : request.getRequestListeners(Request.HeadersListener.class))
notifyHeaders(listener, request);
for (Request.Listener listener : client.getRequestListeners())
notifyHeaders(listener, request);
@ -98,7 +98,7 @@ public class RequestNotifier
public void notifySuccess(Request request)
{
for (Request.SuccessListener listener : request.getListeners(Request.SuccessListener.class))
for (Request.SuccessListener listener : request.getRequestListeners(Request.SuccessListener.class))
notifySuccess(listener, request);
for (Request.Listener listener : client.getRequestListeners())
notifySuccess(listener, request);
@ -119,7 +119,7 @@ public class RequestNotifier
public void notifyFailure(Request request, Throwable failure)
{
for (Request.FailureListener listener : request.getListeners(Request.FailureListener.class))
for (Request.FailureListener listener : request.getRequestListeners(Request.FailureListener.class))
notifyFailure(listener, request, failure);
for (Request.Listener listener : client.getRequestListeners())
notifyFailure(listener, request, failure);

View File

@ -19,6 +19,7 @@
package org.eclipse.jetty.client;
import java.nio.ByteBuffer;
import java.util.List;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
@ -37,12 +38,18 @@ public class ResponseNotifier
this.client = client;
}
public void notifyBegin(Response.Listener listener, Response response)
public void notifyBegin(List<Response.ResponseListener> listeners, Response response)
{
for (Response.ResponseListener listener : listeners)
if (listener instanceof Response.BeginListener)
notifyBegin((Response.BeginListener)listener, response);
}
private void notifyBegin(Response.BeginListener listener, Response response)
{
try
{
if (listener != null)
listener.onBegin(response);
listener.onBegin(response);
}
catch (Exception x)
{
@ -50,12 +57,18 @@ public class ResponseNotifier
}
}
public void notifyHeaders(Response.Listener listener, Response response)
public void notifyHeaders(List<Response.ResponseListener> listeners, Response response)
{
for (Response.ResponseListener listener : listeners)
if (listener instanceof Response.HeadersListener)
notifyHeaders((Response.HeadersListener)listener, response);
}
private void notifyHeaders(Response.HeadersListener listener, Response response)
{
try
{
if (listener != null)
listener.onHeaders(response);
listener.onHeaders(response);
}
catch (Exception x)
{
@ -63,12 +76,19 @@ public class ResponseNotifier
}
}
public void notifyContent(Response.Listener listener, Response response, ByteBuffer buffer)
public void notifyContent(List<Response.ResponseListener> listeners, Response response, ByteBuffer buffer)
{
for (Response.ResponseListener listener : listeners)
if (listener instanceof Response.ContentListener)
notifyContent((Response.ContentListener)listener, response, buffer);
}
private void notifyContent(Response.ContentListener listener, Response response, ByteBuffer buffer)
{
try
{
if (listener != null)
listener.onContent(response, buffer);
listener.onContent(response, buffer);
}
catch (Exception x)
{
@ -76,12 +96,18 @@ public class ResponseNotifier
}
}
public void notifySuccess(Response.Listener listener, Response response)
public void notifySuccess(List<Response.ResponseListener> listeners, Response response)
{
for (Response.ResponseListener listener : listeners)
if (listener instanceof Response.SuccessListener)
notifySuccess((Response.SuccessListener)listener, response);
}
private void notifySuccess(Response.SuccessListener listener, Response response)
{
try
{
if (listener != null)
listener.onSuccess(response);
listener.onSuccess(response);
}
catch (Exception x)
{
@ -89,12 +115,18 @@ public class ResponseNotifier
}
}
public void notifyFailure(Response.Listener listener, Response response, Throwable failure)
public void notifyFailure(List<Response.ResponseListener> listeners, Response response, Throwable failure)
{
for (Response.ResponseListener listener : listeners)
if (listener instanceof Response.FailureListener)
notifyFailure((Response.FailureListener)listener, response, failure);
}
private void notifyFailure(Response.FailureListener listener, Response response, Throwable failure)
{
try
{
if (listener != null)
listener.onFailure(response, failure);
listener.onFailure(response, failure);
}
catch (Exception x)
{
@ -102,12 +134,18 @@ public class ResponseNotifier
}
}
public void notifyComplete(Response.Listener listener, Result result)
public void notifyComplete(List<Response.ResponseListener> listeners, Result result)
{
for (Response.ResponseListener listener : listeners)
if (listener instanceof Response.CompleteListener)
notifyComplete((Response.CompleteListener)listener, result);
}
private void notifyComplete(Response.CompleteListener listener, Result result)
{
try
{
if (listener != null)
listener.onComplete(result);
listener.onComplete(result);
}
catch (Exception x)
{
@ -115,37 +153,37 @@ public class ResponseNotifier
}
}
public void forwardSuccess(Response.Listener listener, Response response)
public void forwardSuccess(List<Response.ResponseListener> listeners, Response response)
{
notifyBegin(listener, response);
notifyHeaders(listener, response);
notifyBegin(listeners, response);
notifyHeaders(listeners, response);
if (response instanceof ContentResponse)
notifyContent(listener, response, ByteBuffer.wrap(((ContentResponse)response).getContent()));
notifySuccess(listener, response);
notifyContent(listeners, response, ByteBuffer.wrap(((ContentResponse)response).getContent()));
notifySuccess(listeners, response);
}
public void forwardSuccessComplete(Response.Listener listener, Request request, Response response)
public void forwardSuccessComplete(List<Response.ResponseListener> listeners, Request request, Response response)
{
HttpConversation conversation = client.getConversation(request.getConversationID(), false);
forwardSuccess(listener, response);
forwardSuccess(listeners, response);
conversation.complete();
notifyComplete(listener, new Result(request, response));
notifyComplete(listeners, new Result(request, response));
}
public void forwardFailure(Response.Listener listener, Response response, Throwable failure)
public void forwardFailure(List<Response.ResponseListener> listeners, Response response, Throwable failure)
{
notifyBegin(listener, response);
notifyHeaders(listener, response);
notifyBegin(listeners, response);
notifyHeaders(listeners, response);
if (response instanceof ContentResponse)
notifyContent(listener, response, ByteBuffer.wrap(((ContentResponse)response).getContent()));
notifyFailure(listener, response, failure);
notifyContent(listeners, response, ByteBuffer.wrap(((ContentResponse)response).getContent()));
notifyFailure(listeners, response, failure);
}
public void forwardFailureComplete(Response.Listener listener, Request request, Throwable requestFailure, Response response, Throwable responseFailure)
public void forwardFailureComplete(List<Response.ResponseListener> listeners, Request request, Throwable requestFailure, Response response, Throwable responseFailure)
{
HttpConversation conversation = client.getConversation(request.getConversationID(), false);
forwardFailure(listener, response, responseFailure);
forwardFailure(listeners, response, responseFailure);
conversation.complete();
notifyComplete(listener, new Result(request, requestFailure, response, responseFailure));
notifyComplete(listeners, new Result(request, requestFailure, response, responseFailure));
}
}

View File

@ -37,7 +37,7 @@ public interface Connection extends AutoCloseable
* @param request the request to send
* @param listener the response listener
*/
void send(Request request, Response.Listener listener);
void send(Request request, Response.CompleteListener listener);
@Override
void close();

View File

@ -210,44 +210,74 @@ public interface Request
* @param listenerClass the class of the listener, or null for all listeners classes
* @return the listeners for request events of the given class
*/
<T extends RequestListener> List<T> getListeners(Class<T> listenerClass);
<T extends RequestListener> List<T> getRequestListeners(Class<T> listenerClass);
/**
* @param listener the listener for request events
* @param listener a listener for request events
* @return this request object
*/
Request listener(Listener listener);
/**
* @param listener the listener for request queued events
* @param listener a listener for request queued event
* @return this request object
*/
Request onRequestQueued(QueuedListener listener);
/**
* @param listener the listener for request begin events
* @param listener a listener for request begin event
* @return this request object
*/
Request onRequestBegin(BeginListener listener);
/**
* @param listener the listener for request headers events
* @param listener a listener for request headers event
* @return this request object
*/
Request onRequestHeaders(HeadersListener listener);
/**
* @param listener the listener for request headers events
* @param listener a listener for request success event
* @return this request object
*/
Request onRequestSuccess(SuccessListener listener);
/**
* @param listener the listener for request headers events
* @param listener a listener for request failure event
* @return this request object
*/
Request onRequestFailure(FailureListener listener);
/**
* @param listener a listener for response begin event
* @return this request object
*/
Request onResponseBegin(Response.BeginListener listener);
/**
* @param listener a listener for response headers event
* @return this request object
*/
Request onResponseHeaders(Response.HeadersListener listener);
/**
* @param listener a listener for response content events
* @return this request object
*/
Request onResponseContent(Response.ContentListener listener);
/**
* @param listener a listener for response success event
* @return this request object
*/
Request onResponseSuccess(Response.SuccessListener listener);
/**
* @param listener a listener for response failure event
* @return this request object
*/
Request onResponseFailure(Response.FailureListener listener);
/**
* Sends this request and returns a {@link Future} that can be used to wait for the
* request and the response to be completed (either with a success or a failure).
@ -273,7 +303,7 @@ public interface Request
*
* @param listener the listener that receives response events
*/
void send(Response.Listener listener);
void send(Response.CompleteListener listener);
/**
* Attempts to abort the send of this request.

View File

@ -19,6 +19,8 @@
package org.eclipse.jetty.client.api;
import java.nio.ByteBuffer;
import java.util.EventListener;
import java.util.List;
import org.eclipse.jetty.client.util.BufferingResponseListener;
import org.eclipse.jetty.http.HttpFields;
@ -42,9 +44,9 @@ public interface Response
long getConversationID();
/**
* @return the response listener passed to {@link Request#send(Listener)}
* @return the response listener passed to {@link Request#send(CompleteListener)}
*/
Listener getListener();
<T extends ResponseListener> List<T> getListeners(Class<T> listenerClass);
/**
* @return the HTTP version of this response, such as "HTTP/1.1"
@ -74,10 +76,11 @@ public interface Response
*/
boolean abort(String reason);
/**
* Listener for response events
*/
public interface Listener
public interface ResponseListener extends EventListener
{
}
public interface BeginListener extends ResponseListener
{
/**
* Callback method invoked when the response line containing HTTP version,
@ -88,14 +91,20 @@ public interface Response
* @param response the response containing the response line data
*/
public void onBegin(Response response);
}
public interface HeadersListener extends ResponseListener
{
/**
* Callback method invoked when the response headers have been received and parsed.
*
* @param response the response containing the response line data and the headers
*/
public void onHeaders(Response response);
}
public interface ContentListener extends ResponseListener
{
/**
* Callback method invoked when the response content has been received.
* This method may be invoked multiple times, and the {@code content} buffer must be consumed
@ -105,14 +114,20 @@ public interface Response
* @param content the content bytes received
*/
public void onContent(Response response, ByteBuffer content);
}
public interface SuccessListener extends ResponseListener
{
/**
* Callback method invoked when the whole response has been successfully received.
*
* @param response the response containing the response line data and the headers
*/
public void onSuccess(Response response);
}
public interface FailureListener extends ResponseListener
{
/**
* Callback method invoked when the response has failed in the process of being received
*
@ -120,7 +135,10 @@ public interface Response
* @param failure the failure happened
*/
public void onFailure(Response response, Throwable failure);
}
public interface CompleteListener extends ResponseListener
{
/**
* Callback method invoked when the request <em><b>and</b></em> the response have been processed,
* either successfully or not.
@ -129,13 +147,20 @@ public interface Response
* <p/>
* Requests may complete <em>after</em> response, for example in case of big uploads that are
* discarded or read asynchronously by the server.
* This method is always invoked <em>after</em> {@link #onSuccess(Response)} or
* {@link #onFailure(Response, Throwable)}, and only when request indicates that it is completed.
* This method is always invoked <em>after</em> {@link SuccessListener#onSuccess(Response)} or
* {@link FailureListener#onFailure(Response, Throwable)}, and only when request indicates that
* it is completed.
*
* @param result the result of the request / response exchange
*/
public void onComplete(Result result);
}
/**
* Listener for response events
*/
public interface Listener extends BeginListener, HeadersListener, ContentListener, SuccessListener, FailureListener, CompleteListener
{
/**
* An empty implementation of {@link Listener}
*/

View File

@ -46,7 +46,6 @@ public class BlockingResponseListener extends BufferingResponseListener implemen
@Override
public void onComplete(Result result)
{
super.onComplete(result);
response = new HttpContentResponse(result.getResponse(), getContent(), getEncoding());
failure = result.getFailure();
latch.countDown();

View File

@ -33,7 +33,7 @@ import org.eclipse.jetty.http.HttpHeader;
* <p>The content may be retrieved from {@link #onSuccess(Response)} or {@link #onComplete(Result)}
* via {@link #getContent()} or {@link #getContentAsString()}.</p>
*/
public class BufferingResponseListener extends Response.Listener.Empty
public abstract class BufferingResponseListener extends Response.Listener.Empty
{
private final int maxLength;
private volatile byte[] buffer = new byte[0];
@ -95,6 +95,9 @@ public class BufferingResponseListener extends Response.Listener.Empty
buffer = newBuffer;
}
@Override
public abstract void onComplete(Result result);
public String getEncoding()
{
return encoding;

View File

@ -99,14 +99,16 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest
public void test_BasicAuthentication() throws Exception
{
startBasic(new EmptyServerHandler());
test_Authentication(new BasicAuthentication(scheme + "://localhost:" + connector.getLocalPort(), realm, "basic", "basic"));
String uri = scheme + "://localhost:" + connector.getLocalPort();
test_Authentication(new BasicAuthentication(uri, realm, "basic", "basic"));
}
@Test
public void test_DigestAuthentication() throws Exception
{
startDigest(new EmptyServerHandler());
test_Authentication(new DigestAuthentication(scheme + "://localhost:" + connector.getLocalPort(), realm, "digest", "digest"));
String uri = scheme + "://localhost:" + connector.getLocalPort();
test_Authentication(new DigestAuthentication(uri, realm, "digest", "digest"));
}
private void test_Authentication(Authentication authentication) throws Exception
@ -189,7 +191,8 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest
}
});
client.getAuthenticationStore().addAuthentication(new BasicAuthentication(scheme + "://localhost:" + connector.getLocalPort(), realm, "basic", "basic"));
String uri = scheme + "://localhost:" + connector.getLocalPort();
client.getAuthenticationStore().addAuthentication(new BasicAuthentication(uri, realm, "basic", "basic"));
final CountDownLatch requests = new CountDownLatch(3);
Request.Listener.Empty requestListener = new Request.Listener.Empty()
@ -227,7 +230,8 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest
}
});
client.getAuthenticationStore().addAuthentication(new BasicAuthentication(scheme + "://localhost:" + connector.getLocalPort(), realm, "basic", "basic"));
String uri = scheme + "://localhost:" + connector.getLocalPort();
client.getAuthenticationStore().addAuthentication(new BasicAuthentication(uri, realm, "basic", "basic"));
final CountDownLatch requests = new CountDownLatch(3);
Request.Listener.Empty requestListener = new Request.Listener.Empty()
@ -268,7 +272,8 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest
client.getRequestListeners().add(requestListener);
AuthenticationStore authenticationStore = client.getAuthenticationStore();
BasicAuthentication authentication = new BasicAuthentication(scheme + "://localhost:" + connector.getLocalPort(), realm, "basic", "basic");
String uri = scheme + "://localhost:" + connector.getLocalPort();
BasicAuthentication authentication = new BasicAuthentication(uri, realm, "basic", "basic");
authenticationStore.addAuthentication(authentication);
Request request = client.newRequest("localhost", connector.getLocalPort()).scheme(scheme).path("/secure");

View File

@ -67,7 +67,7 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
client.setMaxQueueSizePerAddress(1024 * 1024);
client.setDispatchIO(false);
Random random = new Random(1000L);
Random random = new Random();
int iterations = 500;
CountDownLatch latch = new CountDownLatch(iterations);
List<String> failures = new ArrayList<>();
@ -132,15 +132,15 @@ public class HttpClientLoadTest extends AbstractHttpClientServerTest
else if (!ssl && random.nextBoolean())
request.header("X-Close", "true");
int contentLength = random.nextInt(maxContentLength) + 1;
switch (method)
{
case GET:
// Randomly ask the server to download data upon this GET request
if (random.nextBoolean())
request.header("X-Download", String.valueOf(random.nextInt(maxContentLength) + 1));
request.header("X-Download", String.valueOf(contentLength));
break;
case POST:
int contentLength = random.nextInt(maxContentLength) + 1;
request.header("X-Upload", String.valueOf(contentLength));
request.content(new BytesContentProvider(new byte[contentLength]));
break;

View File

@ -371,18 +371,19 @@ public class HttpClientTest extends AbstractHttpClientServerTest
latch.countDown();
}
})
.send(new Response.Listener.Empty()
.onResponseFailure(new Response.FailureListener()
{
@Override
public void onFailure(Response response, Throwable failure)
{
latch.countDown();
}
});
})
.send(null);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(new Response.Listener.Empty()
.onResponseSuccess(new Response.SuccessListener()
{
@Override
public void onSuccess(Response response)
@ -390,7 +391,8 @@ public class HttpClientTest extends AbstractHttpClientServerTest
Assert.assertEquals(200, response.getStatus());
latch.countDown();
}
});
})
.send(null);
Assert.assertTrue(latch.await(5 * idleTimeout, TimeUnit.MILLISECONDS));
}

View File

@ -76,7 +76,7 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
successLatch.countDown();
}
})
.send(new Response.Listener.Empty()
.onResponseHeaders(new Response.HeadersListener()
{
@Override
public void onHeaders(Response response)
@ -85,7 +85,9 @@ public class HttpConnectionLifecycleTest extends AbstractHttpClientServerTest
Assert.assertEquals(1, activeConnections.size());
headersLatch.countDown();
}
})
.send(new Response.Listener.Empty()
{
@Override
public void onSuccess(Response response)
{

View File

@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
import java.io.EOFException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.Collections;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
@ -73,7 +74,7 @@ public class HttpReceiverTest
{
HttpRequest request = new HttpRequest(client, URI.create("http://localhost"));
BlockingResponseListener listener = new BlockingResponseListener(request);
HttpExchange exchange = new HttpExchange(conversation, connection, request, listener);
HttpExchange exchange = new HttpExchange(conversation, connection, request, Collections.<Response.ResponseListener>singletonList(listener));
conversation.getExchanges().offer(exchange);
connection.setExchange(exchange);
exchange.requestComplete(null);
@ -89,7 +90,7 @@ public class HttpReceiverTest
"Content-length: 0\r\n" +
"\r\n");
HttpExchange exchange = newExchange();
BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListener();
BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListeners().get(0);
exchange.receive();
Response response = listener.get(5, TimeUnit.SECONDS);
@ -113,7 +114,7 @@ public class HttpReceiverTest
"\r\n" +
content);
HttpExchange exchange = newExchange();
BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListener();
BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListeners().get(0);
exchange.receive();
Response response = listener.get(5, TimeUnit.SECONDS);
@ -140,7 +141,7 @@ public class HttpReceiverTest
"\r\n" +
content1);
HttpExchange exchange = newExchange();
BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListener();
BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListeners().get(0);
exchange.receive();
endPoint.setInputEOF();
exchange.receive();
@ -164,7 +165,7 @@ public class HttpReceiverTest
"Content-length: 1\r\n" +
"\r\n");
HttpExchange exchange = newExchange();
BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListener();
BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListeners().get(0);
exchange.receive();
// Simulate an idle timeout
connection.idleTimeout();
@ -188,7 +189,7 @@ public class HttpReceiverTest
"Content-length: A\r\n" +
"\r\n");
HttpExchange exchange = newExchange();
BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListener();
BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListeners().get(0);
exchange.receive();
try
@ -219,7 +220,7 @@ public class HttpReceiverTest
"Content-Encoding: gzip\r\n" +
"\r\n");
HttpExchange exchange = newExchange();
BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListener();
BlockingResponseListener listener = (BlockingResponseListener)exchange.getResponseListeners().get(0);
exchange.receive();
endPoint.reset();

View File

@ -55,14 +55,16 @@ public class HttpResponseAbortTest extends AbstractHttpClientServerTest
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(new Response.Listener.Empty()
.onResponseBegin(new Response.BeginListener()
{
@Override
public void onBegin(Response response)
{
response.abort(null);
}
})
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
@ -81,13 +83,16 @@ public class HttpResponseAbortTest extends AbstractHttpClientServerTest
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(new Response.Listener.Empty()
.onResponseHeaders(new Response.HeadersListener()
{
@Override
public void onHeaders(Response response)
{
response.abort(null);
}
})
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
@ -126,14 +131,16 @@ public class HttpResponseAbortTest extends AbstractHttpClientServerTest
final CountDownLatch latch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.send(new Response.Listener.Empty()
.onResponseContent(new Response.ContentListener()
{
@Override
public void onContent(Response response, ByteBuffer content)
{
response.abort(null);
}
})
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{

View File

@ -79,7 +79,7 @@ public class HttpSenderTest
successLatch.countDown();
}
});
connection.send(request, null);
connection.send(request, (Response.CompleteListener)null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
@ -96,7 +96,7 @@ public class HttpSenderTest
HttpDestination destination = new HttpDestination(client, "http", "localhost", 8080);
HttpConnection connection = new HttpConnection(client, endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
connection.send(request, null);
connection.send(request, (Response.CompleteListener)null);
// This take will free space in the buffer and allow for the write to complete
StringBuilder builder = new StringBuilder(endPoint.takeOutputString());
@ -126,7 +126,7 @@ public class HttpSenderTest
HttpConnection connection = new HttpConnection(client, endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch failureLatch = new CountDownLatch(2);
request.onRequestFailure(new Request.FailureListener()
request.listener(new Request.Listener.Empty()
{
@Override
public void onFailure(Request request, Throwable x)
@ -155,7 +155,7 @@ public class HttpSenderTest
HttpConnection connection = new HttpConnection(client, endPoint, destination);
Request request = client.newRequest(URI.create("http://localhost/"));
final CountDownLatch failureLatch = new CountDownLatch(2);
request.onRequestFailure(new Request.FailureListener()
request.listener(new Request.Listener.Empty()
{
@Override
public void onFailure(Request request, Throwable x)
@ -207,7 +207,7 @@ public class HttpSenderTest
successLatch.countDown();
}
});
connection.send(request, null);
connection.send(request, (Response.CompleteListener)null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
@ -242,7 +242,7 @@ public class HttpSenderTest
successLatch.countDown();
}
});
connection.send(request, null);
connection.send(request, (Response.CompleteListener)null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));
@ -284,7 +284,7 @@ public class HttpSenderTest
successLatch.countDown();
}
});
connection.send(request, null);
connection.send(request, (Response.CompleteListener)null);
String requestString = endPoint.takeOutputString();
Assert.assertTrue(requestString.startsWith("GET "));