424743 - Verify abort behavior in case the total timeout expires

before the connect timeout.

The changes to fix this issue uncovered problems in the HttpSender
state machine.
In particular, the SenderState is now defining more states that
depend on deferred content, and on handling of 100 Continue responses.

The refactoring also highlighted the fact that there was no need to
keep HttpConversation objects in a Map in HttpClient: they are now
only referenced by the HttpRequest.
With this change, Request.getConversationID() has been deprecated.

Also fixed a number of tests to make them more reliable.
This commit is contained in:
Simone Bordet 2014-01-05 15:28:45 +01:00
parent b3947ea0b8
commit e94ff7db9c
29 changed files with 472 additions and 291 deletions

View File

@ -109,12 +109,19 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp
} }
finally finally
{ {
connectFailed(context, x);
}
}
}
protected void connectFailed(Map<String, Object> context, Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("Could not connect to {}", context.get(HTTP_DESTINATION_CONTEXT_KEY));
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY); Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(x); promise.failed(x);
} }
}
}
protected void configure(HttpClient client, SocketChannel channel) throws IOException protected void configure(HttpClient client, SocketChannel channel) throws IOException
{ {
@ -156,9 +163,7 @@ public abstract class AbstractHttpClientTransport extends ContainerLifeCycle imp
{ {
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
Map<String, Object> context = (Map<String, Object>)attachment; Map<String, Object> context = (Map<String, Object>)attachment;
@SuppressWarnings("unchecked") connectFailed(context, x);
Promise<Connection> promise = (Promise<Connection>)context.get(HTTP_CONNECTION_PROMISE_CONTEXT_KEY);
promise.failed(x);
} }
} }
} }

View File

@ -81,7 +81,7 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler
@Override @Override
public void onComplete(Result result) public void onComplete(Result result)
{ {
Request request = result.getRequest(); HttpRequest request = (HttpRequest)result.getRequest();
ContentResponse response = new HttpContentResponse(result.getResponse(), getContent(), getEncoding()); ContentResponse response = new HttpContentResponse(result.getResponse(), getContent(), getEncoding());
if (result.isFailed()) if (result.isFailed())
{ {
@ -91,7 +91,7 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler
return; return;
} }
HttpConversation conversation = client.getConversation(request.getConversationID(), false); HttpConversation conversation = request.getConversation();
if (conversation.getAttribute(AUTHENTICATION_ATTRIBUTE) != null) if (conversation.getAttribute(AUTHENTICATION_ATTRIBUTE) != null)
{ {
// We have already tried to authenticate, but we failed again // We have already tried to authenticate, but we failed again
@ -153,16 +153,16 @@ public abstract class AuthenticationProtocolHandler implements ProtocolHandler
}).send(null); }).send(null);
} }
private void forwardSuccessComplete(Request request, Response response) private void forwardSuccessComplete(HttpRequest request, Response response)
{ {
HttpConversation conversation = client.getConversation(request.getConversationID(), false); HttpConversation conversation = request.getConversation();
conversation.updateResponseListeners(null); conversation.updateResponseListeners(null);
notifier.forwardSuccessComplete(conversation.getResponseListeners(), request, response); notifier.forwardSuccessComplete(conversation.getResponseListeners(), request, response);
} }
private void forwardFailureComplete(Request request, Throwable requestFailure, Response response, Throwable responseFailure) private void forwardFailureComplete(HttpRequest request, Throwable requestFailure, Response response, Throwable responseFailure)
{ {
HttpConversation conversation = client.getConversation(request.getConversationID(), false); HttpConversation conversation = request.getConversation();
conversation.updateResponseListeners(null); conversation.updateResponseListeners(null);
notifier.forwardFailureComplete(conversation.getResponseListeners(), request, requestFailure, response, responseFailure); notifier.forwardFailureComplete(conversation.getResponseListeners(), request, requestFailure, response, responseFailure);
} }

View File

@ -44,8 +44,8 @@ public class ContinueProtocolHandler implements ProtocolHandler
public boolean accept(Request request, Response response) public boolean accept(Request request, Response response)
{ {
boolean expect100 = request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString()); boolean expect100 = request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
HttpConversation conversation = client.getConversation(request.getConversationID(), false); HttpConversation conversation = ((HttpRequest)request).getConversation();
boolean handled100 = conversation != null && conversation.getAttribute(ATTRIBUTE) != null; boolean handled100 = conversation.getAttribute(ATTRIBUTE) != null;
return expect100 && !handled100; return expect100 && !handled100;
} }
@ -64,7 +64,7 @@ public class ContinueProtocolHandler implements ProtocolHandler
// Handling of success must be done here and not from onComplete(), // Handling of success must be done here and not from onComplete(),
// since the onComplete() is not invoked because the request is not completed yet. // since the onComplete() is not invoked because the request is not completed yet.
HttpConversation conversation = client.getConversation(response.getConversationID(), false); HttpConversation conversation = ((HttpRequest)response.getRequest()).getConversation();
// Mark the 100 Continue response as handled // Mark the 100 Continue response as handled
conversation.setAttribute(ATTRIBUTE, Boolean.TRUE); conversation.setAttribute(ATTRIBUTE, Boolean.TRUE);
@ -79,7 +79,7 @@ public class ContinueProtocolHandler implements ProtocolHandler
{ {
// All good, continue // All good, continue
exchange.resetResponse(true); exchange.resetResponse(true);
exchange.proceed(true); exchange.proceed(null);
break; break;
} }
default: default:
@ -90,7 +90,7 @@ public class ContinueProtocolHandler implements ProtocolHandler
List<Response.ResponseListener> listeners = exchange.getResponseListeners(); List<Response.ResponseListener> listeners = exchange.getResponseListeners();
HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getEncoding()); HttpContentResponse contentResponse = new HttpContentResponse(response, getContent(), getEncoding());
notifier.forwardSuccess(listeners, contentResponse); notifier.forwardSuccess(listeners, contentResponse);
exchange.proceed(false); exchange.proceed(new HttpRequestException("Expectation failed", exchange.getRequest()));
break; break;
} }
} }
@ -99,7 +99,7 @@ public class ContinueProtocolHandler implements ProtocolHandler
@Override @Override
public void onFailure(Response response, Throwable failure) public void onFailure(Response response, Throwable failure)
{ {
HttpConversation conversation = client.getConversation(response.getConversationID(), false); HttpConversation conversation = ((HttpRequest)response.getRequest()).getConversation();
// Mark the 100 Continue response as handled // Mark the 100 Continue response as handled
conversation.setAttribute(ATTRIBUTE, Boolean.TRUE); conversation.setAttribute(ATTRIBUTE, Boolean.TRUE);
// Reset the conversation listeners to allow the conversation to be completed // Reset the conversation listeners to allow the conversation to be completed

View File

@ -43,11 +43,16 @@ public abstract class HttpChannel
public void associate(HttpExchange exchange) public void associate(HttpExchange exchange)
{ {
if (!this.exchange.compareAndSet(null, exchange)) if (this.exchange.compareAndSet(null, exchange))
throw new UnsupportedOperationException("Pipelined requests not supported"); {
exchange.associate(this); exchange.associate(this);
LOG.debug("{} associated to {}", exchange, this); LOG.debug("{} associated to {}", exchange, this);
} }
else
{
exchange.getRequest().abort(new UnsupportedOperationException("Pipelined requests not supported"));
}
}
public HttpExchange disassociate() public HttpExchange disassociate()
{ {
@ -65,7 +70,7 @@ public abstract class HttpChannel
public abstract void send(); public abstract void send();
public abstract void proceed(HttpExchange exchange, boolean proceed); public abstract void proceed(HttpExchange exchange, Throwable failure);
public abstract boolean abort(Throwable cause); public abstract boolean abort(Throwable cause);

View File

@ -381,12 +381,12 @@ public class HttpClient extends ContainerLifeCycle
*/ */
public Request newRequest(URI uri) public Request newRequest(URI uri)
{ {
return new HttpRequest(this, uri); return newHttpRequest(newConversation(), uri);
} }
protected Request copyRequest(Request oldRequest, URI newURI) protected Request copyRequest(HttpRequest oldRequest, URI newURI)
{ {
Request newRequest = new HttpRequest(this, oldRequest.getConversationID(), newURI); Request newRequest = newHttpRequest(oldRequest.getConversation(), newURI);
newRequest.method(oldRequest.getMethod()) newRequest.method(oldRequest.getMethod())
.version(oldRequest.getVersion()) .version(oldRequest.getVersion())
.content(oldRequest.getContent()) .content(oldRequest.getContent())
@ -417,6 +417,11 @@ public class HttpClient extends ContainerLifeCycle
return newRequest; return newRequest;
} }
protected HttpRequest newHttpRequest(HttpConversation conversation, URI uri)
{
return new HttpRequest(this, conversation, uri);
}
/** /**
* Returns a {@link Destination} for the given scheme, host and port. * Returns a {@link Destination} for the given scheme, host and port.
* Applications may use {@link Destination}s to create {@link Connection}s * Applications may use {@link Destination}s to create {@link Connection}s
@ -467,7 +472,7 @@ public class HttpClient extends ContainerLifeCycle
return new ArrayList<Destination>(destinations.values()); return new ArrayList<Destination>(destinations.values());
} }
protected void send(final Request request, List<Response.ResponseListener> listeners) protected void send(final HttpRequest request, List<Response.ResponseListener> listeners)
{ {
String scheme = request.getScheme().toLowerCase(Locale.ENGLISH); String scheme = request.getScheme().toLowerCase(Locale.ENGLISH);
if (!HttpScheme.HTTP.is(scheme) && !HttpScheme.HTTPS.is(scheme)) if (!HttpScheme.HTTP.is(scheme) && !HttpScheme.HTTPS.is(scheme))
@ -499,25 +504,9 @@ public class HttpClient extends ContainerLifeCycle
}); });
} }
protected HttpConversation getConversation(long id, boolean create) private HttpConversation newConversation()
{ {
HttpConversation conversation = conversations.get(id); return new HttpConversation();
if (conversation == null && create)
{
conversation = new HttpConversation(this, id);
HttpConversation existing = conversations.putIfAbsent(id, conversation);
if (existing != null)
conversation = existing;
else
LOG.debug("{} created", conversation);
}
return conversation;
}
protected void removeConversation(HttpConversation conversation)
{
conversations.remove(conversation.getID());
LOG.debug("{} removed", conversation);
} }
protected List<ProtocolHandler> getProtocolHandlers() protected List<ProtocolHandler> getProtocolHandlers()

View File

@ -69,8 +69,7 @@ public abstract class HttpConnection implements Connection
if (listener != null) if (listener != null)
listeners.add(listener); listeners.add(listener);
HttpConversation conversation = getHttpClient().getConversation(request.getConversationID(), true); HttpExchange exchange = new HttpExchange(getHttpDestination(), (HttpRequest)request, listeners);
HttpExchange exchange = new HttpExchange(conversation, getHttpDestination(), request, listeners);
send(exchange); send(exchange);
} }

View File

@ -24,6 +24,7 @@ import java.nio.charset.UnsupportedCharsetException;
import java.util.List; import java.util.List;
import org.eclipse.jetty.client.api.ContentResponse; import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.HttpVersion;
@ -42,9 +43,16 @@ public class HttpContentResponse implements ContentResponse
} }
@Override @Override
public Request getRequest()
{
return response.getRequest();
}
@Override
@Deprecated
public long getConversationID() public long getConversationID()
{ {
return response.getConversationID(); return getRequest().getConversationID();
} }
@Override @Override

View File

@ -22,22 +22,22 @@ import java.util.ArrayList;
import java.util.Deque; import java.util.Deque;
import java.util.List; import java.util.List;
import java.util.concurrent.ConcurrentLinkedDeque; import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.util.AttributesMap; import org.eclipse.jetty.util.AttributesMap;
public class HttpConversation extends AttributesMap public class HttpConversation extends AttributesMap
{ {
private static final AtomicLong ids = new AtomicLong();
private final Deque<HttpExchange> exchanges = new ConcurrentLinkedDeque<>(); private final Deque<HttpExchange> exchanges = new ConcurrentLinkedDeque<>();
private final HttpClient client;
private final long id; private final long id;
private volatile boolean complete;
private volatile List<Response.ResponseListener> listeners; private volatile List<Response.ResponseListener> listeners;
public HttpConversation(HttpClient client, long id) protected HttpConversation()
{ {
this.client = client; this.id = ids.incrementAndGet();
this.id = id;
} }
public long getID() public long getID()
@ -123,10 +123,6 @@ public class HttpConversation extends AttributesMap
*/ */
public void updateResponseListeners(Response.ResponseListener overrideListener) public void updateResponseListeners(Response.ResponseListener overrideListener)
{ {
// If we have no override listener, then the
// conversation may be completed at a later time
complete = overrideListener == null;
// Create a new instance to avoid that iterating over the listeners // Create a new instance to avoid that iterating over the listeners
// will notify a listener that may send a new request and trigger // will notify a listener that may send a new request and trigger
// another call to this method which will build different listeners // another call to this method which will build different listeners
@ -153,16 +149,10 @@ public class HttpConversation extends AttributesMap
this.listeners = listeners; this.listeners = listeners;
} }
public void complete()
{
if (complete)
client.removeConversation(this);
}
public boolean abort(Throwable cause) public boolean abort(Throwable cause)
{ {
HttpExchange exchange = exchanges.peekLast(); HttpExchange exchange = exchanges.peekLast();
return exchange.abort(cause); return exchange != null && exchange.abort(cause);
} }
@Override @Override

View File

@ -27,9 +27,7 @@ import java.util.concurrent.RejectedExecutionException;
import org.eclipse.jetty.client.api.Connection; import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.client.api.Destination; import org.eclipse.jetty.client.api.Destination;
import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpField;
import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpScheme; import org.eclipse.jetty.http.HttpScheme;
@ -155,7 +153,7 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
return hostField; return hostField;
} }
protected void send(Request request, List<Response.ResponseListener> listeners) protected void send(HttpRequest request, List<Response.ResponseListener> listeners)
{ {
if (!getScheme().equals(request.getScheme())) if (!getScheme().equals(request.getScheme()))
throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this); throw new IllegalArgumentException("Invalid request scheme " + request.getScheme() + " for destination " + this);
@ -165,8 +163,7 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
if (port >= 0 && getPort() != port) if (port >= 0 && getPort() != port)
throw new IllegalArgumentException("Invalid request port " + port + " for destination " + this); throw new IllegalArgumentException("Invalid request port " + port + " for destination " + this);
HttpConversation conversation = client.getConversation(request.getConversationID(), true); HttpExchange exchange = new HttpExchange(this, request, listeners);
HttpExchange exchange = new HttpExchange(conversation, this, request, listeners);
if (client.isRunning()) if (client.isRunning())
{ {
@ -174,7 +171,7 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
{ {
if (!client.isRunning() && exchanges.remove(exchange)) if (!client.isRunning() && exchanges.remove(exchange))
{ {
throw new RejectedExecutionException(client + " is stopping"); request.abort(new RejectedExecutionException(client + " is stopping"));
} }
else else
{ {
@ -185,13 +182,13 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
} }
else else
{ {
LOG.debug("Max queued exceeded {}", request); LOG.debug("Max queue size {} exceeded by {}", client.getMaxRequestsQueuedPerDestination(), request);
abort(exchange, new RejectedExecutionException("Max requests per destination " + client.getMaxRequestsQueuedPerDestination() + " exceeded for " + this)); request.abort(new RejectedExecutionException("Max requests per destination " + client.getMaxRequestsQueuedPerDestination() + " exceeded for " + this));
} }
} }
else else
{ {
throw new RejectedExecutionException(client + " is stopped"); request.abort(new RejectedExecutionException(client + " is stopped"));
} }
} }
@ -226,29 +223,13 @@ public abstract class HttpDestination implements Destination, Closeable, Dumpabl
* Aborts all the {@link HttpExchange}s queued in this destination. * Aborts all the {@link HttpExchange}s queued in this destination.
* *
* @param cause the abort cause * @param cause the abort cause
* @see #abort(HttpExchange, Throwable)
*/ */
public void abort(Throwable cause) public void abort(Throwable cause)
{ {
HttpExchange exchange; HttpExchange exchange;
while ((exchange = exchanges.poll()) != null) // Just peek(), the abort() will remove it from the queue.
abort(exchange, cause); while ((exchange = exchanges.peek()) != null)
} exchange.getRequest().abort(cause);
/**
* Aborts the given {@code exchange}, notifies listeners of the failure, and completes the exchange.
*
* @param exchange the {@link HttpExchange} to abort
* @param cause the abort cause
*/
protected void abort(HttpExchange exchange, Throwable cause)
{
Request request = exchange.getRequest();
HttpResponse response = exchange.getResponse();
getRequestNotifier().notifyFailure(request, cause);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
getResponseNotifier().notifyFailure(listeners, response, cause);
getResponseNotifier().notifyComplete(listeners, new Result(request, cause, response, cause));
} }
@Override @Override

View File

@ -37,29 +37,28 @@ public class HttpExchange
private final AtomicBoolean responseComplete = new AtomicBoolean(); private final AtomicBoolean responseComplete = new AtomicBoolean();
private final AtomicInteger complete = new AtomicInteger(); private final AtomicInteger complete = new AtomicInteger();
private final AtomicReference<HttpChannel> channel = new AtomicReference<>(); private final AtomicReference<HttpChannel> channel = new AtomicReference<>();
private final HttpConversation conversation;
private final HttpDestination destination; private final HttpDestination destination;
private final Request request; private final HttpRequest request;
private final List<Response.ResponseListener> listeners; private final List<Response.ResponseListener> listeners;
private final HttpResponse response; private final HttpResponse response;
private volatile Throwable requestFailure; private volatile Throwable requestFailure;
private volatile Throwable responseFailure; private volatile Throwable responseFailure;
public HttpExchange(HttpConversation conversation, HttpDestination destination, Request request, List<Response.ResponseListener> listeners) public HttpExchange(HttpDestination destination, HttpRequest request, List<Response.ResponseListener> listeners)
{ {
this.conversation = conversation;
this.destination = destination; this.destination = destination;
this.request = request; this.request = request;
this.listeners = listeners; this.listeners = listeners;
this.response = new HttpResponse(request, listeners); this.response = new HttpResponse(request, listeners);
HttpConversation conversation = request.getConversation();
conversation.getExchanges().offer(this); conversation.getExchanges().offer(this);
conversation.updateResponseListeners(null); conversation.updateResponseListeners(null);
} }
public HttpConversation getConversation() public HttpConversation getConversation()
{ {
return conversation; return request.getConversation();
} }
public Request getRequest() public Request getRequest()
@ -121,11 +120,11 @@ public class HttpExchange
if (failure == null) if (failure == null)
{ {
int responseSuccess = 0b1100; int responseSuccess = 0b1100;
return terminate(responseSuccess, failure); return terminate(responseSuccess, null);
} }
else else
{ {
proceed(false); proceed(failure);
int responseFailure = 0b0100; int responseFailure = 0b0100;
return terminate(responseFailure, failure); return terminate(responseFailure, failure);
} }
@ -147,6 +146,19 @@ public class HttpExchange
* @return the {@link Result} - if any - associated with the status * @return the {@link Result} - if any - associated with the status
*/ */
private Result terminate(int code, Throwable failure) private Result terminate(int code, Throwable failure)
{
int current = update(code, failure);
int terminated = 0b0101;
if ((current & terminated) == terminated)
{
// Request and response terminated
LOG.debug("{} terminated", this);
return new Result(getRequest(), getRequestFailure(), getResponse(), getResponseFailure());
}
return null;
}
private int update(int code, Throwable failure)
{ {
int current; int current;
while (true) while (true)
@ -161,39 +173,27 @@ public class HttpExchange
current = candidate; current = candidate;
if ((code & 0b01) == 0b01) if ((code & 0b01) == 0b01)
requestFailure = failure; requestFailure = failure;
else if ((code & 0b0100) == 0b0100)
responseFailure = failure; responseFailure = failure;
LOG.debug("{} updated", this); LOG.debug("{} updated", this);
} }
break; break;
} }
return current;
int terminated = 0b0101;
if ((current & terminated) == terminated)
{
// Request and response terminated
LOG.debug("{} terminated", this);
conversation.complete();
return new Result(getRequest(), getRequestFailure(), getResponse(), getResponseFailure());
}
return null;
} }
public boolean abort(Throwable cause) public boolean abort(Throwable cause)
{ {
if (destination.remove(this)) if (destination.remove(this))
{ {
destination.abort(this, cause); LOG.debug("Aborting while queued {}: {}", this, cause);
LOG.debug("Aborted while queued {}: {}", this, cause); return fail(cause);
return true;
} }
else else
{ {
HttpChannel channel = this.channel.get(); HttpChannel channel = this.channel.get();
// If there is no channel, this exchange is already completed
if (channel == null) if (channel == null)
return false; return fail(cause);
boolean aborted = channel.abort(cause); boolean aborted = channel.abort(cause);
LOG.debug("Aborted while active ({}) {}: {}", aborted, this, cause); LOG.debug("Aborted while active ({}) {}: {}", aborted, this, cause);
@ -201,6 +201,23 @@ public class HttpExchange
} }
} }
private boolean fail(Throwable cause)
{
if (update(0b0101, cause) == 0b0101)
{
destination.getRequestNotifier().notifyFailure(request, cause);
List<Response.ResponseListener> listeners = getConversation().getResponseListeners();
ResponseNotifier responseNotifier = destination.getResponseNotifier();
responseNotifier.notifyFailure(listeners, response, cause);
responseNotifier.notifyComplete(listeners, new Result(request, cause, response, cause));
return true;
}
else
{
return false;
}
}
public void resetResponse(boolean success) public void resetResponse(boolean success)
{ {
responseComplete.set(false); responseComplete.set(false);
@ -210,11 +227,11 @@ public class HttpExchange
complete.addAndGet(-code); complete.addAndGet(-code);
} }
public void proceed(boolean proceed) public void proceed(Throwable failure)
{ {
HttpChannel channel = this.channel.get(); HttpChannel channel = this.channel.get();
if (channel != null) if (channel != null)
channel.proceed(this, proceed); channel.proceed(this, failure);
} }
private String toString(int code) private String toString(int code)

View File

@ -346,7 +346,7 @@ public abstract class HttpReceiver
boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering(); boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
if (!ordered) if (!ordered)
channel.exchangeTerminated(result); channel.exchangeTerminated(result);
LOG.debug("Request/Response complete {}", response); LOG.debug("Request/Response succeeded {}", response);
notifier.notifyComplete(listeners, result); notifier.notifyComplete(listeners, result);
if (ordered) if (ordered)
channel.exchangeTerminated(result); channel.exchangeTerminated(result);
@ -397,6 +397,7 @@ public abstract class HttpReceiver
boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering(); boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
if (!ordered) if (!ordered)
channel.exchangeTerminated(result); channel.exchangeTerminated(result);
LOG.debug("Request/Response failed {}", response);
notifier.notifyComplete(listeners, result); notifier.notifyComplete(listeners, result);
if (ordered) if (ordered)
channel.exchangeTerminated(result); channel.exchangeTerminated(result);

View File

@ -273,8 +273,9 @@ public class HttpRedirector
private Request redirect(final Request request, Response response, Response.CompleteListener listener, URI location, String method) private Request redirect(final Request request, Response response, Response.CompleteListener listener, URI location, String method)
{ {
HttpConversation conversation = client.getConversation(request.getConversationID(), false); HttpRequest httpRequest = (HttpRequest)request;
Integer redirects = conversation == null ? Integer.valueOf(0) : (Integer)conversation.getAttribute(ATTRIBUTE); HttpConversation conversation = httpRequest.getConversation();
Integer redirects = (Integer)conversation.getAttribute(ATTRIBUTE);
if (redirects == null) if (redirects == null)
redirects = 0; redirects = 0;
if (redirects < client.getMaxRedirects()) if (redirects < client.getMaxRedirects())
@ -283,7 +284,7 @@ public class HttpRedirector
if (conversation != null) if (conversation != null)
conversation.setAttribute(ATTRIBUTE, redirects); conversation.setAttribute(ATTRIBUTE, redirects);
Request redirect = client.copyRequest(request, location); Request redirect = client.copyRequest(httpRequest, location);
// Use given method // Use given method
redirect.method(method); redirect.method(method);
@ -311,7 +312,7 @@ public class HttpRedirector
protected void fail(Request request, Response response, Throwable failure) protected void fail(Request request, Response response, Throwable failure)
{ {
HttpConversation conversation = client.getConversation(request.getConversationID(), false); HttpConversation conversation = ((HttpRequest)request).getConversation();
conversation.updateResponseListeners(null); conversation.updateResponseListeners(null);
List<Response.ResponseListener> listeners = conversation.getResponseListeners(); List<Response.ResponseListener> listeners = conversation.getResponseListeners();
notifier.notifyFailure(listeners, response, failure); notifier.notifyFailure(listeners, response, failure);

View File

@ -36,7 +36,6 @@ import java.util.Objects;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.ContentProvider; import org.eclipse.jetty.client.api.ContentProvider;
@ -55,8 +54,6 @@ import org.eclipse.jetty.util.Fields;
public class HttpRequest implements Request public class HttpRequest implements Request
{ {
private static final AtomicLong ids = new AtomicLong();
private final HttpFields headers = new HttpFields(); private final HttpFields headers = new HttpFields();
private final Fields params = new Fields(true); private final Fields params = new Fields(true);
private final Map<String, Object> attributes = new HashMap<>(); private final Map<String, Object> attributes = new HashMap<>();
@ -64,7 +61,7 @@ public class HttpRequest implements Request
private final List<Response.ResponseListener> responseListeners = new ArrayList<>(); private final List<Response.ResponseListener> responseListeners = new ArrayList<>();
private final AtomicReference<Throwable> aborted = new AtomicReference<>(); private final AtomicReference<Throwable> aborted = new AtomicReference<>();
private final HttpClient client; private final HttpClient client;
private final long conversation; private final HttpConversation conversation;
private final String host; private final String host;
private final int port; private final int port;
private URI uri; private URI uri;
@ -78,12 +75,7 @@ public class HttpRequest implements Request
private ContentProvider content; private ContentProvider content;
private boolean followRedirects; private boolean followRedirects;
public HttpRequest(HttpClient client, URI uri) protected HttpRequest(HttpClient client, HttpConversation conversation, URI uri)
{
this(client, ids.incrementAndGet(), uri);
}
protected HttpRequest(HttpClient client, long conversation, URI uri)
{ {
this.client = client; this.client = client;
this.conversation = conversation; this.conversation = conversation;
@ -100,10 +92,15 @@ public class HttpRequest implements Request
headers.put(acceptEncodingField); headers.put(acceptEncodingField);
} }
protected HttpConversation getConversation()
{
return conversation;
}
@Override @Override
public long getConversationID() public long getConversationID()
{ {
return conversation; return getConversation().getID();
} }
@Override @Override
@ -602,7 +599,7 @@ public class HttpRequest implements Request
send(this, listener); send(this, listener);
} }
private void send(Request request, Response.CompleteListener listener) private void send(HttpRequest request, Response.CompleteListener listener)
{ {
if (listener != null) if (listener != null)
responseListeners.add(listener); responseListeners.add(listener);
@ -612,13 +609,7 @@ public class HttpRequest implements Request
@Override @Override
public boolean abort(Throwable cause) public boolean abort(Throwable cause)
{ {
if (aborted.compareAndSet(null, Objects.requireNonNull(cause))) return aborted.compareAndSet(null, Objects.requireNonNull(cause)) && conversation.abort(cause);
{
// The conversation may be null if it is already completed
HttpConversation conversation = client.getConversation(getConversationID(), false);
return conversation != null && conversation.abort(cause);
}
return false;
} }
@Override @Override

View File

@ -41,6 +41,12 @@ public class HttpResponse implements Response
this.listeners = listeners; this.listeners = listeners;
} }
@Override
public Request getRequest()
{
return request;
}
public HttpVersion getVersion() public HttpVersion getVersion()
{ {
return version; return version;
@ -82,6 +88,7 @@ public class HttpResponse implements Response
} }
@Override @Override
@Deprecated
public long getConversationID() public long getConversationID()
{ {
return request.getConversationID(); return request.getConversationID();

View File

@ -48,7 +48,7 @@ import org.eclipse.jetty.util.log.Logger;
* the request has not been failed already. * the request has not been failed already.
* <p /> * <p />
* The sender state machine is updated by {@link HttpSender} from three sources: deferred content notifications * The sender state machine is updated by {@link HttpSender} from three sources: deferred content notifications
* (via {@link #onContent()}), 100-continue notifications (via {@link #proceed(HttpExchange, boolean)}) * (via {@link #onContent()}), 100-continue notifications (via {@link #proceed(HttpExchange, Throwable)})
* and normal request send (via {@link #sendContent(HttpExchange, HttpContent, Callback)}). * and normal request send (via {@link #sendContent(HttpExchange, HttpContent, Callback)}).
* This state machine must guarantee that the request sending is never executed concurrently: only one of * This state machine must guarantee that the request sending is never executed concurrently: only one of
* those sources may trigger the call to {@link #sendContent(HttpExchange, HttpContent, Callback)}. * those sources may trigger the call to {@link #sendContent(HttpExchange, HttpContent, Callback)}.
@ -96,48 +96,63 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{ {
case IDLE: case IDLE:
{ {
if (updateSenderState(current, SenderState.SENDING)) SenderState newSenderState = SenderState.SENDING;
if (updateSenderState(current, newSenderState))
{ {
LOG.debug("Deferred content available, idle -> sending"); LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
// TODO should just call contentCallback.iterate() here. // TODO should just call contentCallback.iterate() here.
HttpContent content = this.content; HttpContent content = this.content;
content.advance(); if (content.advance())
sendContent(exchange, content, contentCallback); // TODO old style usage! sendContent(exchange, content, contentCallback); // TODO old style usage!
else if (content.isConsumed())
sendContent(exchange, content, lastCallback);
else
throw new IllegalStateException();
return; return;
} }
break; break;
} }
case SENDING: case SENDING:
{ {
if (updateSenderState(current, SenderState.SCHEDULED)) SenderState newSenderState = SenderState.SENDING_WITH_CONTENT;
if (updateSenderState(current, newSenderState))
{ {
LOG.debug("Deferred content available, sending -> scheduled"); LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
return; return;
} }
break; break;
} }
case EXPECTING: case EXPECTING:
{ {
if (updateSenderState(current, SenderState.SCHEDULED)) SenderState newSenderState = SenderState.EXPECTING_WITH_CONTENT;
if (updateSenderState(current, newSenderState))
{ {
LOG.debug("Deferred content available, expecting -> scheduled"); LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
return; return;
} }
break; break;
} }
case WAITING: case PROCEEDING:
{ {
LOG.debug("Deferred content available, waiting"); SenderState newSenderState = SenderState.PROCEEDING_WITH_CONTENT;
if (updateSenderState(current, newSenderState))
{
LOG.debug("Deferred content available, {} -> {}", current, newSenderState);
return; return;
} }
case SCHEDULED: break;
}
case SENDING_WITH_CONTENT:
case EXPECTING_WITH_CONTENT:
case PROCEEDING_WITH_CONTENT:
case WAITING:
{ {
LOG.debug("Deferred content available, scheduled"); LOG.debug("Deferred content available, {}", current);
return; return;
} }
default: default:
{ {
throw new IllegalStateException(); throw new IllegalStateException(current.toString());
} }
} }
} }
@ -156,12 +171,15 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (!queuedToBegin(request)) if (!queuedToBegin(request))
throw new IllegalStateException(); throw new IllegalStateException();
if (!updateSenderState(SenderState.IDLE, expects100Continue(request) ? SenderState.EXPECTING : SenderState.SENDING))
throw new IllegalStateException();
ContentProvider contentProvider = request.getContent(); ContentProvider contentProvider = request.getContent();
HttpContent content = this.content = new HttpContent(contentProvider); HttpContent content = this.content = new HttpContent(contentProvider);
SenderState newSenderState = SenderState.SENDING;
if (expects100Continue(request))
newSenderState = content.hasContent() ? SenderState.EXPECTING_WITH_CONTENT : SenderState.EXPECTING;
if (!updateSenderState(SenderState.IDLE, newSenderState))
throw new IllegalStateException();
// Setting the listener may trigger calls to onContent() by other // Setting the listener may trigger calls to onContent() by other
// threads so we must set it only after the sender state has been updated // threads so we must set it only after the sender state has been updated
if (contentProvider instanceof AsyncContentProvider) if (contentProvider instanceof AsyncContentProvider)
@ -232,7 +250,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
} }
default: default:
{ {
throw new IllegalStateException(); throw new IllegalStateException(current.toString());
} }
} }
} }
@ -285,7 +303,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
} }
default: default:
{ {
throw new IllegalStateException(); throw new IllegalStateException(current.toString());
} }
} }
} }
@ -394,13 +412,17 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
} }
} }
public void proceed(HttpExchange exchange, boolean proceed) public void proceed(HttpExchange exchange, Throwable failure)
{ {
if (!expects100Continue(exchange.getRequest())) if (!expects100Continue(exchange.getRequest()))
return; return;
if (proceed) if (failure != null)
{ {
anyToFailure(failure);
return;
}
while (true) while (true)
{ {
SenderState current = senderState.get(); SenderState current = senderState.get();
@ -409,58 +431,58 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
case EXPECTING: case EXPECTING:
{ {
// We are still sending the headers, but we already got the 100 Continue. // We are still sending the headers, but we already got the 100 Continue.
// Move to SEND so that the commit callback can send the content. if (updateSenderState(current, SenderState.PROCEEDING))
if (!updateSenderState(current, SenderState.SENDING)) {
break; LOG.debug("Proceeding while expecting");
LOG.debug("Proceed while expecting");
return; return;
} }
break;
}
case EXPECTING_WITH_CONTENT:
{
// More deferred content was submitted to onContent(), we already
// got the 100 Continue, but we may be still sending the headers
// (for example, with SSL we may have sent the encrypted data,
// received the 100 Continue but not yet updated the decrypted
// WriteFlusher so sending more content now may result in a
// WritePendingException).
if (updateSenderState(current, SenderState.PROCEEDING_WITH_CONTENT))
{
LOG.debug("Proceeding while scheduled");
return;
}
break;
}
case WAITING: case WAITING:
{ {
// We received the 100 Continue, send the content if any. // We received the 100 Continue, now send the content if any.
// First update the sender state to be sure to be the one
// to call sendContent() since we race with onContent().
if (!updateSenderState(current, SenderState.SENDING))
break;
HttpContent content = this.content; HttpContent content = this.content;
// TODO should just call contentCallback.iterate() here. // TODO should just call contentCallback.iterate() here.
if (content.advance()) if (content.advance())
{ {
// There is content to send // There is content to send.
LOG.debug("Proceed while waiting"); if (!updateSenderState(current, SenderState.SENDING))
throw new IllegalStateException();
LOG.debug("Proceeding while waiting");
sendContent(exchange, content, contentCallback); // TODO old style usage! sendContent(exchange, content, contentCallback); // TODO old style usage!
return;
} }
else else
{ {
// No content to send yet - it's deferred. // No content to send yet - it's deferred.
// We may fail the update as onContent() moved to SCHEDULE. if (!updateSenderState(current, SenderState.IDLE))
if (!updateSenderState(SenderState.SENDING, SenderState.IDLE)) throw new IllegalStateException();
break; LOG.debug("Proceeding deferred");
LOG.debug("Proceed deferred");
}
return; return;
} }
case SCHEDULED:
{
// We lost the race with onContent() to update the state, try again
if (!updateSenderState(current, SenderState.WAITING))
throw new IllegalStateException();
LOG.debug("Proceed while scheduled");
break;
} }
default: default:
{ {
throw new IllegalStateException(); throw new IllegalStateException(current.toString());
} }
} }
} }
} }
else
{
anyToFailure(new HttpRequestException("Expectation failed", exchange.getRequest()));
}
}
public boolean abort(Throwable failure) public boolean abort(Throwable failure)
{ {
@ -547,25 +569,37 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
private enum SenderState private enum SenderState
{ {
/** /**
* {@link HttpSender} is not sending the request * {@link HttpSender} is not sending request headers nor request content
*/ */
IDLE, IDLE,
/** /**
* {@link HttpSender} is sending the request * {@link HttpSender} is sending the request header or request content
*/ */
SENDING, SENDING,
/** /**
* {@link HttpSender} is sending the headers but will wait for 100-Continue before sending the content * {@link HttpSender} is currently sending the request, and deferred content is available to be sent
*/
SENDING_WITH_CONTENT,
/**
* {@link HttpSender} is sending the headers but will wait for 100 Continue before sending the content
*/ */
EXPECTING, EXPECTING,
/** /**
* {@link HttpSender} is waiting for 100-Continue * {@link HttpSender} is currently sending the headers, will wait for 100 Continue, and deferred content is available to be sent
*/
EXPECTING_WITH_CONTENT,
/**
* {@link HttpSender} has sent the headers and is waiting for 100 Continue
*/ */
WAITING, WAITING,
/** /**
* {@link HttpSender} is currently sending the request, and deferred content is available to be sent * {@link HttpSender} is sending the headers, while 100 Continue has arrived
*/ */
SCHEDULED PROCEEDING,
/**
* {@link HttpSender} is sending the headers, while 100 Continue has arrived, and deferred content is available to be sent
*/
PROCEEDING_WITH_CONTENT
} }
private class CommitCallback implements Callback private class CommitCallback implements Callback
@ -623,34 +657,59 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (content.advance()) if (content.advance())
{ {
sendContent(exchange, content, contentCallback); // TODO old style usage! sendContent(exchange, content, contentCallback); // TODO old style usage!
return;
} }
else else
{ {
if (content.isConsumed()) if (content.isConsumed())
{ {
sendContent(exchange, content, lastCallback); sendContent(exchange, content, lastCallback);
return;
} }
else else
{ {
if (!updateSenderState(current, SenderState.IDLE)) if (updateSenderState(current, SenderState.IDLE))
break; {
LOG.debug("Waiting for deferred content for {}", request); LOG.debug("Waiting for deferred content for {}", request);
}
}
return; return;
} }
break;
}
}
}
case SENDING_WITH_CONTENT:
{
// We have deferred content to send.
updateSenderState(current, SenderState.SENDING);
break;
}
case EXPECTING: case EXPECTING:
{ {
// Wait for the 100 Continue response // We sent the headers, wait for the 100 Continue response.
if (!updateSenderState(current, SenderState.WAITING)) if (updateSenderState(current, SenderState.WAITING))
return;
break; break;
return;
} }
case SCHEDULED: case EXPECTING_WITH_CONTENT:
{ {
if (expects100Continue(request)) // We sent the headers, we have deferred content to send,
// wait for the 100 Continue response.
if (updateSenderState(current, SenderState.WAITING))
return; return;
// We have deferred content to send. break;
}
case PROCEEDING:
{
// We sent the headers, we have the 100 Continue response,
// we have no content to send.
if (updateSenderState(current, SenderState.IDLE))
return;
break;
}
case PROCEEDING_WITH_CONTENT:
{
// We sent the headers, we have the 100 Continue response,
// we have deferred content to send.
updateSenderState(current, SenderState.SENDING); updateSenderState(current, SenderState.SENDING);
break; break;
} }
@ -717,7 +776,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
} }
break; break;
} }
case SCHEDULED: case SENDING_WITH_CONTENT:
{ {
if (updateSenderState(current, SenderState.SENDING)) if (updateSenderState(current, SenderState.SENDING))
{ {

View File

@ -102,8 +102,9 @@ public abstract class MultiplexHttpDestination<C extends Connection> extends Htt
Throwable cause = request.getAbortCause(); Throwable cause = request.getAbortCause();
if (cause != null) if (cause != null)
{ {
LOG.debug("Abort before processing {}: {}", exchange, cause); // If we have a non-null abort cause, it means that someone
abort(exchange, cause); // else has already aborted and notified, nothing do to here.
LOG.debug("Aborted before processing {}: {}", exchange, cause);
} }
else else
{ {

View File

@ -114,7 +114,8 @@ public abstract class PoolingHttpDestination<C extends Connection> extends HttpD
Throwable cause = request.getAbortCause(); Throwable cause = request.getAbortCause();
if (cause != null) if (cause != null)
{ {
abort(exchange, cause); // If we have a non-null abort cause, it means that someone
// else has already aborted and notified, nothing do to here.
LOG.debug("Aborted before processing {}: {}", exchange, cause); LOG.debug("Aborted before processing {}: {}", exchange, cause);
} }
else else

View File

@ -237,9 +237,7 @@ public class ResponseNotifier
public void forwardSuccessComplete(List<Response.ResponseListener> listeners, Request request, Response response) public void forwardSuccessComplete(List<Response.ResponseListener> listeners, Request request, Response response)
{ {
HttpConversation conversation = client.getConversation(request.getConversationID(), false);
forwardSuccess(listeners, response); forwardSuccess(listeners, response);
conversation.complete();
notifyComplete(listeners, new Result(request, response)); notifyComplete(listeners, new Result(request, response));
} }
@ -260,9 +258,7 @@ public class ResponseNotifier
public void forwardFailureComplete(List<Response.ResponseListener> listeners, Request request, Throwable requestFailure, Response response, Throwable responseFailure) public void forwardFailureComplete(List<Response.ResponseListener> listeners, Request request, Throwable requestFailure, Response response, Throwable responseFailure)
{ {
HttpConversation conversation = client.getConversation(request.getConversationID(), false);
forwardFailure(listeners, response, responseFailure); forwardFailure(listeners, response, responseFailure);
conversation.complete();
notifyComplete(listeners, new Result(request, requestFailure, response, responseFailure)); notifyComplete(listeners, new Result(request, requestFailure, response, responseFailure));
} }
} }

View File

@ -51,7 +51,9 @@ public interface Request
{ {
/** /**
* @return the conversation id * @return the conversation id
* @deprecated do not use this method anymore
*/ */
@Deprecated
long getConversationID(); long getConversationID();
/** /**

View File

@ -40,8 +40,15 @@ import org.eclipse.jetty.http.HttpVersion;
public interface Response public interface Response
{ {
/** /**
* @return the conversation id * @return the request associated with this response
*/ */
Request getRequest();
/**
* @return the conversation id
* @deprecated do not use this method anymore
*/
@Deprecated
long getConversationID(); long getConversationID();
/** /**

View File

@ -55,9 +55,9 @@ public class HttpChannelOverHTTP extends HttpChannel
} }
@Override @Override
public void proceed(HttpExchange exchange, boolean proceed) public void proceed(HttpExchange exchange, Throwable failure)
{ {
sender.proceed(exchange, proceed); sender.proceed(exchange, failure);
} }
@Override @Override

View File

@ -109,7 +109,7 @@ public class HttpSenderOverHTTP extends HttpSender
} }
default: default:
{ {
throw new IllegalStateException(); throw new IllegalStateException(result.toString());
} }
} }
} }

View File

@ -18,15 +18,10 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import static junit.framework.Assert.fail;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
import java.io.IOException; import java.io.IOException;
import java.nio.channels.ClosedChannelException; import java.nio.channels.ClosedChannelException;
import java.security.cert.CertificateException; import java.security.cert.CertificateException;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import javax.net.ssl.SSLHandshakeException; import javax.net.ssl.SSLHandshakeException;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -43,6 +38,10 @@ import org.junit.After;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import static junit.framework.Assert.fail;
import static org.hamcrest.Matchers.instanceOf;
import static org.junit.Assert.assertThat;
/** /**
* This test class runs tests to make sure that hostname verification (http://www.ietf.org/rfc/rfc2818.txt * This test class runs tests to make sure that hostname verification (http://www.ietf.org/rfc/rfc2818.txt
* section 3.1) is configurable in SslContextFactory and works as expected. * section 3.1) is configurable in SslContextFactory and works as expected.
@ -122,7 +121,7 @@ public class HostnameVerificationTest
if (cause instanceof SSLHandshakeException) if (cause instanceof SSLHandshakeException)
assertThat(cause.getCause().getCause(), instanceOf(CertificateException.class)); assertThat(cause.getCause().getCause(), instanceOf(CertificateException.class));
else else
assertThat(cause, instanceOf(ClosedChannelException.class)); assertThat(cause.getCause(), instanceOf(ClosedChannelException.class));
} }
} }

View File

@ -25,7 +25,6 @@ import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
@ -133,7 +132,6 @@ public class HttpClientAuthenticationTest extends AbstractHttpClientServerTest
Assert.assertEquals(401, response.getStatus()); Assert.assertEquals(401, response.getStatus());
Assert.assertTrue(requests.get().await(5, TimeUnit.SECONDS)); Assert.assertTrue(requests.get().await(5, TimeUnit.SECONDS));
client.getRequestListeners().remove(requestListener); client.getRequestListeners().remove(requestListener);
Assert.assertNull(client.getConversation(request.getConversationID(), false));
authenticationStore.addAuthentication(authentication); authenticationStore.addAuthentication(authentication);

View File

@ -26,7 +26,6 @@ import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.ServletInputStream; import javax.servlet.ServletInputStream;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -610,7 +609,7 @@ public class HttpClientContinueTest extends AbstractHttpClientServerTest
}); });
final byte[] chunk1 = new byte[]{0, 1, 2, 3}; final byte[] chunk1 = new byte[]{0, 1, 2, 3};
final byte[] chunk2 = new byte[]{4, 5, 6, 7}; final byte[] chunk2 = new byte[]{4, 5, 6};
final byte[] data = new byte[chunk1.length + chunk2.length]; final byte[] data = new byte[chunk1.length + chunk2.length];
System.arraycopy(chunk1, 0, data, 0, chunk1.length); System.arraycopy(chunk1, 0, data, 0, chunk1.length);
System.arraycopy(chunk2, 0, data, chunk1.length, chunk2.length); System.arraycopy(chunk2, 0, data, chunk1.length, chunk2.length);

View File

@ -20,12 +20,15 @@ package org.eclipse.jetty.client;
import java.io.ByteArrayInputStream; import java.io.ByteArrayInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor; import java.util.concurrent.Executor;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.ssl.SSLEngine; import javax.net.ssl.SSLEngine;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
@ -52,6 +55,7 @@ import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.hamcrest.Matchers; import org.hamcrest.Matchers;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Assume;
import org.junit.Test; import org.junit.Test;
public class HttpClientTimeoutTest extends AbstractHttpClientServerTest public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
@ -295,6 +299,90 @@ public class HttpClientTimeoutTest extends AbstractHttpClientServerTest
} }
} }
@Slow
@Test
public void testConnectTimeoutFailsRequest() throws Exception
{
String host = "10.255.255.1";
int port = 80;
int connectTimeout = 1000;
assumeConnectTimeout(host, port, connectTimeout);
start(new EmptyServerHandler());
client.stop();
client.setConnectTimeout(connectTimeout);
client.start();
final CountDownLatch latch = new CountDownLatch(1);
Request request = client.newRequest(host, port);
request.scheme(scheme)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
if (result.isFailed())
latch.countDown();
}
});
Assert.assertTrue(latch.await(2 * connectTimeout, TimeUnit.MILLISECONDS));
Assert.assertNotNull(request.getAbortCause());
}
@Slow
@Test
public void testConnectTimeoutIsCancelledByShorterTimeout() throws Exception
{
String host = "10.255.255.1";
int port = 80;
int connectTimeout = 2000;
assumeConnectTimeout(host, port, connectTimeout);
start(new EmptyServerHandler());
client.stop();
client.setConnectTimeout(connectTimeout);
client.start();
final AtomicInteger completes = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(2);
Request request = client.newRequest(host, port);
request.scheme(scheme)
.timeout(connectTimeout / 2, TimeUnit.MILLISECONDS)
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
completes.incrementAndGet();
latch.countDown();
}
});
Assert.assertFalse(latch.await(2 * connectTimeout, TimeUnit.MILLISECONDS));
Assert.assertEquals(1, completes.get());
Assert.assertNotNull(request.getAbortCause());
}
private void assumeConnectTimeout(String host, int port, int connectTimeout) throws IOException
{
try (Socket socket = new Socket())
{
// Try to connect to a private address in the 10.x.y.z range.
// These addresses are usually not routed, so an attempt to
// connect to them will hang the connection attempt, which is
// what we want to simulate in this test.
socket.connect(new InetSocketAddress(host, port), connectTimeout);
// Abort the test if we can connect.
Assume.assumeTrue(false);
}
catch (SocketTimeoutException x)
{
// Expected timeout during connect, continue the test.
Assume.assumeTrue(true);
}
}
private class TimeoutHandler extends AbstractHandler private class TimeoutHandler extends AbstractHandler
{ {
private final long timeout; private final long timeout;

View File

@ -25,12 +25,10 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.client.api.ContentResponse;
import org.eclipse.jetty.client.api.Request; import org.eclipse.jetty.client.api.Request;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.api.Result;
@ -55,6 +53,8 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
start(new EmptyServerHandler()); start(new EmptyServerHandler());
final Throwable cause = new Exception(); final Throwable cause = new Exception();
final AtomicBoolean aborted = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicBoolean begin = new AtomicBoolean(); final AtomicBoolean begin = new AtomicBoolean();
try try
{ {
@ -65,7 +65,8 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
@Override @Override
public void onQueued(Request request) public void onQueued(Request request)
{ {
request.abort(cause); aborted.set(request.abort(cause));
latch.countDown();
} }
@Override @Override
@ -80,6 +81,8 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
} }
catch (ExecutionException x) catch (ExecutionException x)
{ {
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
if (aborted.get())
Assert.assertSame(cause, x.getCause()); Assert.assertSame(cause, x.getCause());
Assert.assertFalse(begin.get()); Assert.assertFalse(begin.get());
} }
@ -92,7 +95,8 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
start(new EmptyServerHandler()); start(new EmptyServerHandler());
final Throwable cause = new Exception(); final Throwable cause = new Exception();
final CountDownLatch aborted = new CountDownLatch(1); final AtomicBoolean aborted = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch committed = new CountDownLatch(1); final CountDownLatch committed = new CountDownLatch(1);
try try
{ {
@ -103,8 +107,8 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
@Override @Override
public void onBegin(Request request) public void onBegin(Request request)
{ {
if (request.abort(cause)) aborted.set(request.abort(cause));
aborted.countDown(); latch.countDown();
} }
@Override @Override
@ -119,8 +123,9 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
} }
catch (ExecutionException x) catch (ExecutionException x)
{ {
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
if (aborted.get())
Assert.assertSame(cause, x.getCause()); Assert.assertSame(cause, x.getCause());
Assert.assertTrue(aborted.await(5, TimeUnit.SECONDS));
Assert.assertFalse(committed.await(1, TimeUnit.SECONDS)); Assert.assertFalse(committed.await(1, TimeUnit.SECONDS));
} }
} }
@ -132,7 +137,8 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
start(new EmptyServerHandler()); start(new EmptyServerHandler());
final Throwable cause = new Exception(); final Throwable cause = new Exception();
final CountDownLatch aborted = new CountDownLatch(1); final AtomicBoolean aborted = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
final CountDownLatch committed = new CountDownLatch(1); final CountDownLatch committed = new CountDownLatch(1);
try try
{ {
@ -143,8 +149,8 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
@Override @Override
public void onHeaders(Request request) public void onHeaders(Request request)
{ {
if (request.abort(cause)) aborted.set(request.abort(cause));
aborted.countDown(); latch.countDown();
} }
@Override @Override
@ -159,8 +165,9 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
} }
catch (ExecutionException x) catch (ExecutionException x)
{ {
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
if (aborted.get())
Assert.assertSame(cause, x.getCause()); Assert.assertSame(cause, x.getCause());
Assert.assertTrue(aborted.await(5, TimeUnit.SECONDS));
Assert.assertFalse(committed.await(1, TimeUnit.SECONDS)); Assert.assertFalse(committed.await(1, TimeUnit.SECONDS));
} }
} }
@ -171,33 +178,34 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
start(new EmptyServerHandler()); start(new EmptyServerHandler());
// Test can behave in 2 ways: // Test can behave in 2 ways:
// A) the request is failed before the response arrived, then we get an ExecutionException // A) the request is failed before the response arrived
// B) the request is failed after the response arrived, we get the 200 OK // B) the request is failed after the response arrived
final Throwable cause = new Exception(); final Throwable cause = new Exception();
final CountDownLatch aborted = new CountDownLatch(1); final AtomicBoolean aborted = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
try try
{ {
ContentResponse response = client.newRequest("localhost", connector.getLocalPort()) client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme) .scheme(scheme)
.onRequestCommit(new Request.CommitListener() .onRequestCommit(new Request.CommitListener()
{ {
@Override @Override
public void onCommit(Request request) public void onCommit(Request request)
{ {
if (request.abort(cause)) aborted.set(request.abort(cause));
aborted.countDown(); latch.countDown();
} }
}) })
.timeout(5, TimeUnit.SECONDS) .timeout(5, TimeUnit.SECONDS)
.send(); .send();
Assert.assertEquals(200, response.getStatus()); Assert.fail();
Assert.assertFalse(aborted.await(1, TimeUnit.SECONDS));
} }
catch (ExecutionException x) catch (ExecutionException x)
{ {
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
if (aborted.get())
Assert.assertSame(cause, x.getCause()); Assert.assertSame(cause, x.getCause());
Assert.assertTrue(aborted.await(5, TimeUnit.SECONDS));
} }
} }
@ -224,6 +232,8 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
}); });
final Throwable cause = new Exception(); final Throwable cause = new Exception();
final AtomicBoolean aborted = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
try try
{ {
client.newRequest("localhost", connector.getLocalPort()) client.newRequest("localhost", connector.getLocalPort())
@ -233,7 +243,8 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
@Override @Override
public void onCommit(Request request) public void onCommit(Request request)
{ {
request.abort(cause); aborted.set(request.abort(cause));
latch.countDown();
} }
}) })
.content(new ByteBufferContentProvider(ByteBuffer.wrap(new byte[]{0}), ByteBuffer.wrap(new byte[]{1})) .content(new ByteBufferContentProvider(ByteBuffer.wrap(new byte[]{0}), ByteBuffer.wrap(new byte[]{1}))
@ -250,6 +261,8 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
} }
catch (ExecutionException x) catch (ExecutionException x)
{ {
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
if (aborted.get())
Assert.assertSame(cause, x.getCause()); Assert.assertSame(cause, x.getCause());
} }
} }
@ -268,6 +281,8 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
}); });
final Throwable cause = new Exception(); final Throwable cause = new Exception();
final AtomicBoolean aborted = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
try try
{ {
client.newRequest("localhost", connector.getLocalPort()) client.newRequest("localhost", connector.getLocalPort())
@ -277,7 +292,8 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
@Override @Override
public void onContent(Request request, ByteBuffer content) public void onContent(Request request, ByteBuffer content)
{ {
request.abort(cause); aborted.set(request.abort(cause));
latch.countDown();
} }
}) })
.content(new ByteBufferContentProvider(ByteBuffer.wrap(new byte[]{0}), ByteBuffer.wrap(new byte[]{1})) .content(new ByteBufferContentProvider(ByteBuffer.wrap(new byte[]{0}), ByteBuffer.wrap(new byte[]{1}))
@ -294,6 +310,8 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
} }
catch (ExecutionException x) catch (ExecutionException x)
{ {
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
if (aborted.get())
Assert.assertSame(cause, x.getCause()); Assert.assertSame(cause, x.getCause());
} }
} }
@ -372,6 +390,8 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
.scheme(scheme); .scheme(scheme);
final Throwable cause = new Exception(); final Throwable cause = new Exception();
final AtomicBoolean aborted = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
new Thread() new Thread()
{ {
@Override @Override
@ -380,7 +400,8 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
try try
{ {
TimeUnit.MILLISECONDS.sleep(delay); TimeUnit.MILLISECONDS.sleep(delay);
request.abort(cause); aborted.set(request.abort(cause));
latch.countDown();
} }
catch (InterruptedException x) catch (InterruptedException x)
{ {
@ -395,6 +416,8 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
} }
catch (ExecutionException x) catch (ExecutionException x)
{ {
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
if (aborted.get())
Assert.assertSame(cause, x.getCause()); Assert.assertSame(cause, x.getCause());
} }
} }
@ -458,7 +481,14 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
} }
}); });
// The test may fail to abort the request in this way:
// T1 aborts the request, which aborts the sender, which shuts down the output;
// server reads -1 and closes; T2 reads -1 and the receiver fails the response with an EOFException;
// T1 tries to abort the receiver, but it's already failed.
final Throwable cause = new Exception(); final Throwable cause = new Exception();
final AtomicBoolean aborted = new AtomicBoolean();
final CountDownLatch latch = new CountDownLatch(1);
client.getProtocolHandlers().clear(); client.getProtocolHandlers().clear();
client.getProtocolHandlers().add(new RedirectProtocolHandler(client) client.getProtocolHandlers().add(new RedirectProtocolHandler(client)
{ {
@ -467,7 +497,10 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
{ {
// Abort the request after the 3xx response but before issuing the next request // Abort the request after the 3xx response but before issuing the next request
if (!result.isFailed()) if (!result.isFailed())
result.getRequest().abort(cause); {
aborted.set(result.getRequest().abort(cause));
latch.countDown();
}
super.onComplete(result); super.onComplete(result);
} }
}); });
@ -483,6 +516,8 @@ public class HttpRequestAbortTest extends AbstractHttpClientServerTest
} }
catch (ExecutionException x) catch (ExecutionException x)
{ {
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
if (aborted.get())
Assert.assertSame(cause, x.getCause()); Assert.assertSame(cause, x.getCause());
} }
} }

View File

@ -191,6 +191,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
final CountDownLatch failureLatch = new CountDownLatch(1); final CountDownLatch failureLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort()) client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme) .scheme(scheme)
.path("/one")
.onRequestQueued(new Request.QueuedListener() .onRequestQueued(new Request.QueuedListener()
{ {
@Override @Override
@ -199,6 +200,7 @@ public class HttpDestinationOverHTTPTest extends AbstractHttpClientServerTest
// This request exceeds the maximum queued, should fail // This request exceeds the maximum queued, should fail
client.newRequest("localhost", connector.getLocalPort()) client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme) .scheme(scheme)
.path("/two")
.send(new Response.CompleteListener() .send(new Response.CompleteListener()
{ {
@Override @Override

View File

@ -61,9 +61,9 @@ public class HttpChannelOverSPDY extends HttpChannel
} }
@Override @Override
public void proceed(HttpExchange exchange, boolean proceed) public void proceed(HttpExchange exchange, Throwable failure)
{ {
sender.proceed(exchange, proceed); sender.proceed(exchange, failure);
} }
@Override @Override