461499 - ConnectionPool may leak connections.

Made associate(), disassociate() and abort() atomic operations using
the HttpExchange state to coordinate atomicity.
In this way, it's not possible to associate a HttpChannel and a
HttpExchange if the latter has been aborted.
This commit is contained in:
Simone Bordet 2015-03-24 16:19:45 +01:00
parent 3b31d6aa12
commit 1dc66b72dd
12 changed files with 580 additions and 365 deletions

View File

@ -18,74 +18,126 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.SpinLock;
public abstract class HttpChannel public abstract class HttpChannel
{ {
protected static final Logger LOG = Log.getLogger(HttpChannel.class); protected static final Logger LOG = Log.getLogger(HttpChannel.class);
private final AtomicReference<HttpExchange> exchange = new AtomicReference<>(); private final SpinLock _lock = new SpinLock();
private final HttpDestination destination; private final HttpDestination _destination;
private HttpExchange _exchange;
protected HttpChannel(HttpDestination destination) protected HttpChannel(HttpDestination destination)
{ {
this.destination = destination; this._destination = destination;
} }
public HttpDestination getHttpDestination() public HttpDestination getHttpDestination()
{ {
return destination; return _destination;
} }
public void associate(HttpExchange exchange) /**
* <p>Associates the given {@code exchange} to this channel in order to be sent over the network.</p>
* <p>If the association is successful, the exchange can be sent. Otherwise, the channel must be
* disposed because whoever terminated the exchange did not do it - it did not have the channel yet.</p>
*
* @param exchange the exchange to associate
* @return true if the association was successful, false otherwise
*/
public boolean associate(HttpExchange exchange)
{ {
if (this.exchange.compareAndSet(null, exchange)) boolean result = false;
boolean abort = true;
try (SpinLock.Lock lock = _lock.lock())
{ {
exchange.associate(this); if (_exchange == null)
if (LOG.isDebugEnabled()) {
LOG.debug("{} associated to {}", exchange, this); abort = false;
result = exchange.associate(this);
if (result)
_exchange = exchange;
}
} }
else
{ if (abort)
exchange.getRequest().abort(new UnsupportedOperationException("Pipelined requests not supported")); exchange.getRequest().abort(new UnsupportedOperationException("Pipelined requests not supported"));
}
if (LOG.isDebugEnabled())
LOG.debug("{} associated {} to {}", exchange, result, this);
return result;
} }
public HttpExchange disassociate() public boolean disassociate(HttpExchange exchange)
{ {
HttpExchange exchange = this.exchange.getAndSet(null); boolean result = false;
if (exchange != null) try (SpinLock.Lock lock = _lock.lock())
exchange.disassociate(this); {
HttpExchange existing = _exchange;
_exchange = null;
if (existing == exchange)
{
existing.disassociate(this);
result = true;
}
}
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} disassociated from {}", exchange, this); LOG.debug("{} disassociated {} from {}", exchange, result, this);
return exchange; return result;
} }
public HttpExchange getHttpExchange() public HttpExchange getHttpExchange()
{ {
return exchange.get(); try (SpinLock.Lock lock = _lock.lock())
{
return _exchange;
}
} }
protected abstract HttpSender getHttpSender();
protected abstract HttpReceiver getHttpReceiver();
public abstract void send(); public abstract void send();
public abstract void proceed(HttpExchange exchange, Throwable failure); public abstract void release();
public abstract boolean abort(Throwable cause); public void proceed(HttpExchange exchange, Throwable failure)
public abstract boolean abortResponse(Throwable cause);
public void exchangeTerminated(Result result)
{ {
disassociate(); getHttpSender().proceed(exchange, failure);
}
public boolean abort(HttpExchange exchange, Throwable requestFailure, Throwable responseFailure)
{
boolean requestAborted = false;
if (requestFailure != null)
requestAborted = getHttpSender().abort(exchange, requestFailure);
boolean responseAborted = false;
if (responseFailure != null)
responseAborted = abortResponse(exchange, responseFailure);
return requestAborted || responseAborted;
}
public boolean abortResponse(HttpExchange exchange, Throwable failure)
{
return getHttpReceiver().abort(exchange, failure);
}
public void exchangeTerminated(HttpExchange exchange, Result result)
{
disassociate(exchange);
} }
@Override @Override
public String toString() public String toString()
{ {
return String.format("%s@%x(exchange=%s)", getClass().getSimpleName(), hashCode(), exchange); return String.format("%s@%x(exchange=%s)", getClass().getSimpleName(), hashCode(), getHttpExchange());
} }
} }

View File

@ -19,7 +19,6 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import java.util.List; import java.util.List;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.api.Result;
@ -31,21 +30,14 @@ public class HttpExchange
{ {
private static final Logger LOG = Log.getLogger(HttpExchange.class); private static final Logger LOG = Log.getLogger(HttpExchange.class);
private final AtomicReference<HttpChannel> channel = new AtomicReference<>();
private final HttpDestination destination; private final HttpDestination destination;
private final HttpRequest request; private final HttpRequest request;
private final List<Response.ResponseListener> listeners; private final List<Response.ResponseListener> listeners;
private final HttpResponse response; private final HttpResponse response;
enum State
{
PENDING, COMPLETED, TERMINATED
}
;
private final SpinLock _lock = new SpinLock(); private final SpinLock _lock = new SpinLock();
private State requestState = State.PENDING; private State requestState = State.PENDING;
private State responseState = State.PENDING; private State responseState = State.PENDING;
private HttpChannel _channel;
private Throwable requestFailure; private Throwable requestFailure;
private Throwable responseFailure; private Throwable responseFailure;
@ -96,120 +88,187 @@ public class HttpExchange
} }
} }
public void associate(HttpChannel channel) /**
* <p>Associates the given {@code channel} to this exchange.</p>
* <p>Works in strict collaboration with {@link HttpChannel#associate(HttpExchange)}.</p>
*
* @param channel the channel to associate to this exchange
* @return true if the channel could be associated, false otherwise
*/
boolean associate(HttpChannel channel)
{ {
if (!this.channel.compareAndSet(null, channel)) boolean result = false;
request.abort(new IllegalStateException()); boolean abort = false;
try (SpinLock.Lock lock = _lock.lock())
{
// Only associate if the exchange state is initial,
// as the exchange could be already failed.
if (requestState == State.PENDING && responseState == State.PENDING)
{
abort = _channel != null;
if (!abort)
{
_channel = channel;
result = true;
}
}
}
if (abort)
request.abort(new IllegalStateException(toString()));
return result;
} }
public void disassociate(HttpChannel channel) void disassociate(HttpChannel channel)
{ {
if (!this.channel.compareAndSet(channel, null)) boolean abort = false;
request.abort(new IllegalStateException()); try (SpinLock.Lock lock = _lock.lock())
{
if (_channel != channel || requestState != State.TERMINATED || responseState != State.TERMINATED)
abort = true;
_channel = null;
}
if (abort)
request.abort(new IllegalStateException(toString()));
} }
public boolean requestComplete() private HttpChannel getHttpChannel()
{ {
try (SpinLock.Lock lock = _lock.lock()) try (SpinLock.Lock lock = _lock.lock())
{ {
if (requestState != State.PENDING) return _channel;
return false; }
}
public boolean requestComplete(Throwable failure)
{
try (SpinLock.Lock lock = _lock.lock())
{
return completeRequest(failure);
}
}
private boolean completeRequest(Throwable failure)
{
if (requestState == State.PENDING)
{
requestState = State.COMPLETED; requestState = State.COMPLETED;
return true;
}
}
public boolean responseComplete()
{
try (SpinLock.Lock lock = _lock.lock())
{
if (responseState != State.PENDING)
return false;
responseState = State.COMPLETED;
return true;
}
}
public Result terminateRequest(Throwable failure)
{
try (SpinLock.Lock lock = _lock.lock())
{
requestState = State.TERMINATED;
requestFailure = failure; requestFailure = failure;
if (State.TERMINATED.equals(responseState)) return true;
return new Result(getRequest(), requestFailure, getResponse(), responseFailure);
} }
return null; return false;
} }
public Result terminateResponse(Throwable failure) public boolean responseComplete(Throwable failure)
{ {
try (SpinLock.Lock lock = _lock.lock()) try (SpinLock.Lock lock = _lock.lock())
{ {
responseState = State.TERMINATED; return completeResponse(failure);
responseFailure = failure;
if (requestState == State.TERMINATED)
return new Result(getRequest(), requestFailure, getResponse(), responseFailure);
} }
return null;
} }
private boolean completeResponse(Throwable failure)
public boolean abort(Throwable cause)
{ {
if (responseState == State.PENDING)
{
responseState = State.COMPLETED;
responseFailure = failure;
return true;
}
return false;
}
public Result terminateRequest()
{
Result result = null;
try (SpinLock.Lock lock = _lock.lock())
{
if (requestState == State.COMPLETED)
requestState = State.TERMINATED;
if (requestState == State.TERMINATED && responseState == State.TERMINATED)
result = new Result(getRequest(), requestFailure, getResponse(), responseFailure);
}
if (LOG.isDebugEnabled())
LOG.debug("Terminated request for {}, result: {}", this, result);
return result;
}
public Result terminateResponse()
{
Result result = null;
try (SpinLock.Lock lock = _lock.lock())
{
if (responseState == State.COMPLETED)
responseState = State.TERMINATED;
if (requestState == State.TERMINATED && responseState == State.TERMINATED)
result = new Result(getRequest(), requestFailure, getResponse(), responseFailure);
}
if (LOG.isDebugEnabled())
LOG.debug("Terminated response for {}, result: {}", this, result);
return result;
}
public boolean abort(Throwable failure)
{
// Atomically change the state of this exchange to be completed.
// This will avoid that this exchange can be associated to a channel.
boolean abortRequest;
boolean abortResponse;
try (SpinLock.Lock lock = _lock.lock())
{
abortRequest = completeRequest(failure);
abortResponse = completeResponse(failure);
}
if (LOG.isDebugEnabled())
LOG.debug("Failed {}: req={}/rsp={} {}", this, abortRequest, abortResponse, failure);
if (!abortRequest && !abortResponse)
return false;
// We failed this exchange, deal with it.
// Case #1: exchange was in the destination queue.
if (destination.remove(this)) if (destination.remove(this))
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Aborting while queued {}: {}", this, cause); LOG.debug("Aborting while queued {}: {}", this, failure);
return fail(cause); notifyFailureComplete(failure);
}
else
{
HttpChannel channel = this.channel.get();
if (channel == null)
return fail(cause);
boolean aborted = channel.abort(cause);
if (LOG.isDebugEnabled())
LOG.debug("Aborted ({}) while active {}: {}", aborted, this, cause);
return aborted;
}
}
private boolean fail(Throwable cause)
{
boolean notify = false;
try (SpinLock.Lock lock = _lock.lock())
{
if (requestState != State.TERMINATED)
{
requestState = State.TERMINATED;
notify = true;
requestFailure = cause;
}
if (responseState != State.TERMINATED)
{
responseState = State.TERMINATED;
notify = true;
responseFailure = cause;
}
}
if (notify)
{
if (LOG.isDebugEnabled())
LOG.debug("Failing {}: {}", this, cause);
destination.getRequestNotifier().notifyFailure(request, cause);
List<Response.ResponseListener> listeners = getConversation().getResponseListeners();
ResponseNotifier responseNotifier = destination.getResponseNotifier();
responseNotifier.notifyFailure(listeners, response, cause);
responseNotifier.notifyComplete(listeners, new Result(request, cause, response, cause));
return true; return true;
} }
else
HttpChannel channel = getHttpChannel();
if (channel == null)
{ {
return false; // Case #2: exchange was not yet associated.
// Because this exchange is failed, when associate() is called
// it will return false, and the caller will dispose the channel.
if (LOG.isDebugEnabled())
LOG.debug("Aborted before association {}: {}", this, failure);
notifyFailureComplete(failure);
return true;
} }
// Case #3: exchange was already associated.
boolean aborted = channel.abort(this, abortRequest ? failure : null, abortResponse ? failure : null);
if (LOG.isDebugEnabled())
LOG.debug("Aborted ({}) while active {}: {}", aborted, this, failure);
return aborted;
}
private void notifyFailureComplete(Throwable failure)
{
destination.getRequestNotifier().notifyFailure(request, failure);
List<Response.ResponseListener> listeners = getConversation().getResponseListeners();
ResponseNotifier responseNotifier = destination.getResponseNotifier();
responseNotifier.notifyFailure(listeners, response, failure);
responseNotifier.notifyComplete(listeners, new Result(request, failure, response, failure));
} }
public void resetResponse() public void resetResponse()
@ -223,7 +282,7 @@ public class HttpExchange
public void proceed(Throwable failure) public void proceed(Throwable failure)
{ {
HttpChannel channel = this.channel.get(); HttpChannel channel = getHttpChannel();
if (channel != null) if (channel != null)
channel.proceed(this, failure); channel.proceed(this, failure);
} }
@ -233,11 +292,16 @@ public class HttpExchange
{ {
try (SpinLock.Lock lock = _lock.lock()) try (SpinLock.Lock lock = _lock.lock())
{ {
return String.format("%s@%x req=%s/%s res=%s/%s", return String.format("%s@%x req=%s/%s@%h res=%s/%s@%h",
HttpExchange.class.getSimpleName(), HttpExchange.class.getSimpleName(),
hashCode(), hashCode(),
requestState, requestFailure, requestState, requestFailure, requestFailure,
responseState, responseFailure); responseState, responseFailure, responseFailure);
} }
} }
private enum State
{
PENDING, COMPLETED, TERMINATED
}
} }

View File

@ -129,10 +129,11 @@ public abstract class HttpReceiver
ResponseNotifier notifier = destination.getResponseNotifier(); ResponseNotifier notifier = destination.getResponseNotifier();
notifier.notifyBegin(conversation.getResponseListeners(), response); notifier.notifyBegin(conversation.getResponseListeners(), response);
if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN)) if (updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN))
terminateResponse(exchange, failure); return true;
return true; terminateResponse(exchange);
return false;
} }
/** /**
@ -193,10 +194,11 @@ public abstract class HttpReceiver
} }
} }
if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER)) if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER))
terminateResponse(exchange, failure); return true;
return true; terminateResponse(exchange);
return false;
} }
protected void storeCookie(URI uri, HttpField field) protected void storeCookie(URI uri, HttpField field)
@ -269,10 +271,11 @@ public abstract class HttpReceiver
} }
} }
if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS)) if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS))
terminateResponse(exchange, failure); return true;
return true; terminateResponse(exchange);
return false;
} }
/** /**
@ -343,10 +346,11 @@ public abstract class HttpReceiver
} }
} }
if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT))
terminateResponse(exchange, failure); return true;
return true; terminateResponse(exchange);
return false;
} }
/** /**
@ -362,7 +366,7 @@ public abstract class HttpReceiver
{ {
// Mark atomically the response as completed, with respect // Mark atomically the response as completed, with respect
// to concurrency between response success and response failure. // to concurrency between response success and response failure.
boolean completed = exchange.responseComplete(); boolean completed = exchange.responseComplete(null);
if (!completed) if (!completed)
return false; return false;
@ -371,12 +375,13 @@ public abstract class HttpReceiver
// Reset to be ready for another response. // Reset to be ready for another response.
reset(); reset();
// Mark atomically the response as terminated and succeeded, // Mark atomically the response as terminated, with
// with respect to concurrency between request and response. // respect to concurrency between request and response.
Result result = exchange.terminateResponse(null); Result result = exchange.terminateResponse();
// It is important to notify *after* we reset and terminate // Notify *after* resetting and terminating, because
// because the notification may trigger another request/response. // the notification may trigger another request/response,
// or the reset of the response in case of 100-Continue.
HttpResponse response = exchange.getResponse(); HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Response success {}", response); LOG.debug("Response success {}", response);
@ -409,60 +414,15 @@ public abstract class HttpReceiver
// Mark atomically the response as completed, with respect // Mark atomically the response as completed, with respect
// to concurrency between response success and response failure. // to concurrency between response success and response failure.
boolean completed = exchange.responseComplete(); if (exchange.responseComplete(failure))
if (!completed) return abort(exchange, failure);
return false;
this.failure = failure; return false;
// Update the state to avoid more response processing.
boolean fail;
while (true)
{
ResponseState current = responseState.get();
if (updateResponseState(current, ResponseState.FAILURE))
{
fail = current != ResponseState.TRANSIENT;
break;
}
}
dispose();
Result result = failResponse(exchange, failure);
if (fail)
{
terminateResponse(exchange, result);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Concurrent failure: response termination skipped, performed by helpers");
}
return true;
} }
private Result failResponse(HttpExchange exchange, Throwable failure) private void terminateResponse(HttpExchange exchange)
{ {
// Mark atomically the response as terminated and failed, Result result = exchange.terminateResponse();
// with respect to concurrency between request and response.
Result result = exchange.terminateResponse(failure);
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response failure {} {} on {}: {}", response, exchange, getHttpChannel(), failure);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyFailure(listeners, response, failure);
return result;
}
private void terminateResponse(HttpExchange exchange, Throwable failure)
{
Result result = failResponse(exchange, failure);
terminateResponse(exchange, result); terminateResponse(exchange, result);
} }
@ -477,14 +437,14 @@ public abstract class HttpReceiver
{ {
boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering(); boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering();
if (!ordered) if (!ordered)
channel.exchangeTerminated(result); channel.exchangeTerminated(exchange, result);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result); LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners(); List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier(); ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyComplete(listeners, result); notifier.notifyComplete(listeners, result);
if (ordered) if (ordered)
channel.exchangeTerminated(result); channel.exchangeTerminated(exchange, result);
} }
} }
@ -512,9 +472,56 @@ public abstract class HttpReceiver
decoder = null; decoder = null;
} }
public boolean abort(Throwable cause) public boolean abort(HttpExchange exchange, Throwable failure)
{ {
return responseFailure(cause); // Update the state to avoid more response processing.
boolean terminate;
out: while (true)
{
ResponseState current = responseState.get();
switch (current)
{
case FAILURE:
{
return false;
}
default:
{
if (updateResponseState(current, ResponseState.FAILURE))
{
terminate = current != ResponseState.TRANSIENT;
break out;
}
break;
}
}
}
this.failure = failure;
dispose();
HttpResponse response = exchange.getResponse();
if (LOG.isDebugEnabled())
LOG.debug("Response failure {} {} on {}: {}", response, exchange, getHttpChannel(), failure);
List<Response.ResponseListener> listeners = exchange.getConversation().getResponseListeners();
ResponseNotifier notifier = getHttpDestination().getResponseNotifier();
notifier.notifyFailure(listeners, response, failure);
if (terminate)
{
// Mark atomically the response as terminated, with
// respect to concurrency between request and response.
Result result = exchange.terminateResponse();
terminateResponse(exchange, result);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Concurrent failure: response termination skipped, performed by helpers");
}
return true;
} }
private boolean updateResponseState(ResponseState from, ResponseState to) private boolean updateResponseState(ResponseState from, ResponseState to)

View File

@ -162,10 +162,10 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
public void send(HttpExchange exchange) public void send(HttpExchange exchange)
{ {
Request request = exchange.getRequest(); if (!queuedToBegin(exchange))
if (!queuedToBegin(request))
return; return;
Request request = exchange.getRequest();
ContentProvider contentProvider = request.getContent(); ContentProvider contentProvider = request.getContent();
HttpContent content = this.content = new HttpContent(contentProvider); HttpContent content = this.content = new HttpContent(contentProvider);
@ -198,7 +198,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (contentProvider instanceof AsyncContentProvider) if (contentProvider instanceof AsyncContentProvider)
((AsyncContentProvider)contentProvider).setListener(this); ((AsyncContentProvider)contentProvider).setListener(this);
if (!beginToHeaders(request)) if (!beginToHeaders(exchange))
return; return;
sendHeaders(exchange, content, commitCallback); sendHeaders(exchange, content, commitCallback);
@ -209,46 +209,61 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString()); return request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString());
} }
protected boolean queuedToBegin(Request request) protected boolean queuedToBegin(HttpExchange exchange)
{ {
if (!updateRequestState(RequestState.QUEUED, RequestState.TRANSIENT)) if (!updateRequestState(RequestState.QUEUED, RequestState.TRANSIENT))
return false; return false;
Request request = exchange.getRequest();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Request begin {}", request); LOG.debug("Request begin {}", request);
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier(); RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyBegin(request); notifier.notifyBegin(request);
if (!updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN))
terminateRequest(getHttpExchange(), failure); if (updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN))
return true; return true;
terminateRequest(exchange);
return false;
} }
protected boolean beginToHeaders(Request request) protected boolean beginToHeaders(HttpExchange exchange)
{ {
if (!updateRequestState(RequestState.BEGIN, RequestState.TRANSIENT)) if (!updateRequestState(RequestState.BEGIN, RequestState.TRANSIENT))
return false; return false;
Request request = exchange.getRequest();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Request headers {}{}{}", request, System.lineSeparator(), request.getHeaders().toString().trim()); LOG.debug("Request headers {}{}{}", request, System.lineSeparator(), request.getHeaders().toString().trim());
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier(); RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyHeaders(request); notifier.notifyHeaders(request);
if (!updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS))
terminateRequest(getHttpExchange(), failure); if (updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS))
return true; return true;
terminateRequest(exchange);
return false;
} }
protected boolean headersToCommit(Request request) protected boolean headersToCommit(HttpExchange exchange)
{ {
if (!updateRequestState(RequestState.HEADERS, RequestState.TRANSIENT)) if (!updateRequestState(RequestState.HEADERS, RequestState.TRANSIENT))
return false; return false;
Request request = exchange.getRequest();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Request committed {}", request); LOG.debug("Request committed {}", request);
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier(); RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyCommit(request); notifier.notifyCommit(request);
if (!updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT))
terminateRequest(getHttpExchange(), failure); if (updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT))
return true; return true;
terminateRequest(exchange);
return false;
} }
protected boolean someToContent(Request request, ByteBuffer content) protected boolean someToContent(HttpExchange exchange, ByteBuffer content)
{ {
RequestState current = requestState.get(); RequestState current = requestState.get();
switch (current) switch (current)
@ -256,15 +271,20 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
case COMMIT: case COMMIT:
case CONTENT: case CONTENT:
{ {
if (!updateRequestState(current, RequestState.TRANSIENT_CONTENT)) if (!updateRequestState(current, RequestState.TRANSIENT))
return false; return false;
Request request = exchange.getRequest();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Request content {}{}{}", request, System.lineSeparator(), BufferUtil.toDetailString(content)); LOG.debug("Request content {}{}{}", request, System.lineSeparator(), BufferUtil.toDetailString(content));
RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier(); RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier();
notifier.notifyContent(request, content); notifier.notifyContent(request, content);
if (!updateRequestState(RequestState.TRANSIENT_CONTENT, RequestState.CONTENT))
terminateRequest(getHttpExchange(), failure); if (updateRequestState(RequestState.TRANSIENT, RequestState.CONTENT))
return true; return true;
terminateRequest(exchange);
return false;
} }
default: default:
{ {
@ -283,7 +303,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{ {
// Mark atomically the request as completed, with respect // Mark atomically the request as completed, with respect
// to concurrency between request success and request failure. // to concurrency between request success and request failure.
boolean completed = exchange.requestComplete(); boolean completed = exchange.requestComplete(null);
if (!completed) if (!completed)
return false; return false;
@ -292,18 +312,16 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
// Reset to be ready for another request. // Reset to be ready for another request.
reset(); reset();
// Mark atomically the request as terminated and succeeded,
// with respect to concurrency between request and response.
Result result = exchange.terminateRequest(null);
Request request = exchange.getRequest(); Request request = exchange.getRequest();
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Request success {}", request); LOG.debug("Request success {}", request);
HttpDestination destination = getHttpChannel().getHttpDestination(); HttpDestination destination = getHttpChannel().getHttpDestination();
destination.getRequestNotifier().notifySuccess(exchange.getRequest()); destination.getRequestNotifier().notifySuccess(exchange.getRequest());
// Mark atomically the request as terminated, with
// respect to concurrency between request and response.
Result result = exchange.terminateRequest();
terminateRequest(exchange, null, result); terminateRequest(exchange, null, result);
return true; return true;
} }
default: default:
@ -321,64 +339,21 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
// Mark atomically the request as completed, with respect // Mark atomically the request as completed, with respect
// to concurrency between request success and request failure. // to concurrency between request success and request failure.
boolean completed = exchange.requestComplete(); if (exchange.requestComplete(failure))
if (!completed) return abort(exchange, failure);
return false;
this.failure = failure; return false;
// Update the state to avoid more request processing.
RequestState current;
boolean fail;
while (true)
{
current = requestState.get();
if (updateRequestState(current, RequestState.FAILURE))
{
fail = current != RequestState.TRANSIENT && current != RequestState.TRANSIENT_CONTENT;
break;
}
}
dispose();
Result result = failRequest(exchange, failure);
if (fail)
{
terminateRequest(exchange, failure, result);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Concurrent failure: request termination skipped, performed by helpers");
}
return true;
} }
private Result failRequest(HttpExchange exchange, Throwable failure) private void terminateRequest(HttpExchange exchange)
{ {
// Mark atomically the request as terminated and failed, // In abort(), the state is updated before the failure is recorded
// with respect to concurrency between request and response. // to avoid to overwrite it, so here we may read a null failure.
Result result = exchange.terminateRequest(failure); Throwable failure = this.failure;
if (failure == null)
Request request = exchange.getRequest(); failure = new HttpRequestException("Concurrent failure", exchange.getRequest());
if (LOG.isDebugEnabled()) Result result = exchange.terminateRequest();
LOG.debug("Request failure {} {} on {}: {}", request, exchange, getHttpChannel(), failure); terminateRequest(exchange, failure, result);
HttpDestination destination = getHttpChannel().getHttpDestination();
destination.getRequestNotifier().notifyFailure(request, failure);
return result;
}
private void terminateRequest(HttpExchange exchange, Throwable failure)
{
if (exchange != null)
{
Result result = failRequest(exchange, failure);
terminateRequest(exchange, failure, result);
}
} }
private void terminateRequest(HttpExchange exchange, Throwable failure, Result result) private void terminateRequest(HttpExchange exchange, Throwable failure, Result result)
@ -392,9 +367,12 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
{ {
if (failure != null) if (failure != null)
{ {
if (LOG.isDebugEnabled()) if (exchange.responseComplete(failure))
LOG.debug("Response failure from request {} {}", request, exchange); {
getHttpChannel().abortResponse(failure); if (LOG.isDebugEnabled())
LOG.debug("Response failure from request {} {}", request, exchange);
getHttpChannel().abortResponse(exchange, failure);
}
} }
} }
else else
@ -402,13 +380,13 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
HttpDestination destination = getHttpChannel().getHttpDestination(); HttpDestination destination = getHttpChannel().getHttpDestination();
boolean ordered = destination.getHttpClient().isStrictEventOrdering(); boolean ordered = destination.getHttpClient().isStrictEventOrdering();
if (!ordered) if (!ordered)
channel.exchangeTerminated(result); channel.exchangeTerminated(exchange, result);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result); LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result);
HttpConversation conversation = exchange.getConversation(); HttpConversation conversation = exchange.getConversation();
destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result); destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result);
if (ordered) if (ordered)
channel.exchangeTerminated(result); channel.exchangeTerminated(exchange, result);
} }
} }
@ -528,9 +506,55 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
} }
} }
public boolean abort(Throwable failure) public boolean abort(HttpExchange exchange, Throwable failure)
{ {
return anyToFailure(failure); // Update the state to avoid more request processing.
boolean terminate;
out: while (true)
{
RequestState current = requestState.get();
switch (current)
{
case FAILURE:
{
return false;
}
default:
{
if (updateRequestState(current, RequestState.FAILURE))
{
terminate = current != RequestState.TRANSIENT;
break out;
}
break;
}
}
}
this.failure = failure;
dispose();
Request request = exchange.getRequest();
if (LOG.isDebugEnabled())
LOG.debug("Request failure {} {} on {}: {}", request, exchange, getHttpChannel(), failure);
HttpDestination destination = getHttpChannel().getHttpDestination();
destination.getRequestNotifier().notifyFailure(request, failure);
if (terminate)
{
// Mark atomically the request as terminated, with
// respect to concurrency between request and response.
Result result = exchange.terminateRequest();
terminateRequest(exchange, failure, result);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Concurrent failure: request termination skipped, performed by helpers");
}
return true;
} }
private boolean updateRequestState(RequestState from, RequestState to) private boolean updateRequestState(RequestState from, RequestState to)
@ -574,10 +598,6 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
* One of the state transition methods is being executed. * One of the state transition methods is being executed.
*/ */
TRANSIENT, TRANSIENT,
/**
* The content transition method is being executed.
*/
TRANSIENT_CONTENT,
/** /**
* The request is queued, the initial state * The request is queued, the initial state
*/ */
@ -686,8 +706,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
if (exchange == null) if (exchange == null)
return; return;
Request request = exchange.getRequest(); if (!headersToCommit(exchange))
if (!headersToCommit(request))
return; return;
HttpContent content = HttpSender.this.content; HttpContent content = HttpSender.this.content;
@ -705,7 +724,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
ByteBuffer contentBuffer = content.getContent(); ByteBuffer contentBuffer = content.getContent();
if (contentBuffer != null) if (contentBuffer != null)
{ {
if (!someToContent(request, contentBuffer)) if (!someToContent(exchange, contentBuffer))
return; return;
} }
@ -840,7 +859,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener
return; return;
content.succeeded(); content.succeeded();
ByteBuffer buffer = content.getContent(); ByteBuffer buffer = content.getContent();
someToContent(exchange.getRequest(), buffer); someToContent(exchange, buffer);
super.succeeded(); super.succeeded();
} }

View File

@ -20,6 +20,8 @@ package org.eclipse.jetty.client.http;
import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpExchange; import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Response;
import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpFields;
@ -51,6 +53,18 @@ public class HttpChannelOverHTTP extends HttpChannel
return new HttpReceiverOverHTTP(this); return new HttpReceiverOverHTTP(this);
} }
@Override
protected HttpSender getHttpSender()
{
return sender;
}
@Override
protected HttpReceiver getHttpReceiver()
{
return receiver;
}
public HttpConnectionOverHTTP getHttpConnection() public HttpConnectionOverHTTP getHttpConnection()
{ {
return connection; return connection;
@ -65,23 +79,9 @@ public class HttpChannelOverHTTP extends HttpChannel
} }
@Override @Override
public void proceed(HttpExchange exchange, Throwable failure) public void release()
{ {
sender.proceed(exchange, failure); connection.release();
}
@Override
public boolean abort(Throwable cause)
{
boolean sendAborted = sender.abort(cause);
boolean receiveAborted = abortResponse(cause);
return sendAborted || receiveAborted;
}
@Override
public boolean abortResponse(Throwable cause)
{
return receiver.abort(cause);
} }
public void receive() public void receive()
@ -90,9 +90,9 @@ public class HttpChannelOverHTTP extends HttpChannel
} }
@Override @Override
public void exchangeTerminated(Result result) public void exchangeTerminated(HttpExchange exchange, Result result)
{ {
super.exchangeTerminated(result); super.exchangeTerminated(exchange, result);
Response response = result.getResponse(); Response response = result.getResponse();
HttpFields responseHeaders = response.getHeaders(); HttpFields responseHeaders = response.getHeaders();
@ -115,7 +115,7 @@ public class HttpChannelOverHTTP extends HttpChannel
if (close) if (close)
connection.close(); connection.close();
else else
connection.release(); release();
} }
@Override @Override

View File

@ -200,8 +200,10 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec
endPoint.setIdleTimeout(request.getIdleTimeout()); endPoint.setIdleTimeout(request.getIdleTimeout());
// One channel per connection, just delegate the send // One channel per connection, just delegate the send
channel.associate(exchange); if (channel.associate(exchange))
channel.send(); channel.send();
else
channel.release();
} }
@Override @Override

View File

@ -23,7 +23,7 @@ import java.io.OutputStream;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import javax.servlet.ServletException; import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse; import javax.servlet.http.HttpServletResponse;
@ -176,4 +176,84 @@ public class HttpResponseAbortTest extends AbstractHttpClientServerTest
}); });
Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS));
} }
@Test
public void testAbortOnContentBeforeRequestTermination() throws Exception
{
start(new AbstractHandler()
{
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
try
{
baseRequest.setHandled(true);
OutputStream output = response.getOutputStream();
output.write(1);
output.flush();
output.write(2);
output.flush();
}
catch (IOException ignored)
{
// The client may have already closed, and we'll get an exception here, but it's expected
}
}
});
final CountDownLatch abortLatch = new CountDownLatch(1);
final AtomicInteger completes = new AtomicInteger();
final CountDownLatch completeLatch = new CountDownLatch(1);
client.newRequest("localhost", connector.getLocalPort())
.scheme(scheme)
.onRequestSuccess(new org.eclipse.jetty.client.api.Request.SuccessListener()
{
@Override
public void onSuccess(org.eclipse.jetty.client.api.Request request)
{
try
{
abortLatch.await(5, TimeUnit.SECONDS);
}
catch (InterruptedException x)
{
x.printStackTrace();
}
}
})
.onResponseContent(new Response.ContentListener()
{
@Override
public void onContent(Response response, ByteBuffer content)
{
try
{
response.abort(new Exception());
abortLatch.countDown();
// Delay to let the request side to finish its processing.
Thread.sleep(1000);
}
catch (InterruptedException x)
{
x.printStackTrace();
}
}
})
.send(new Response.CompleteListener()
{
@Override
public void onComplete(Result result)
{
completes.incrementAndGet();
Assert.assertTrue(result.isFailed());
completeLatch.countDown();
}
});
Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS));
// Wait to be sure that the complete event is only notified once.
Thread.sleep(1000);
Assert.assertEquals(1, completes.get());
}
} }

View File

@ -74,9 +74,10 @@ public class HttpReceiverOverHTTPTest
HttpRequest request = (HttpRequest)client.newRequest("http://localhost"); HttpRequest request = (HttpRequest)client.newRequest("http://localhost");
FutureResponseListener listener = new FutureResponseListener(request); FutureResponseListener listener = new FutureResponseListener(request);
HttpExchange exchange = new HttpExchange(destination, request, Collections.<Response.ResponseListener>singletonList(listener)); HttpExchange exchange = new HttpExchange(destination, request, Collections.<Response.ResponseListener>singletonList(listener));
connection.getHttpChannel().associate(exchange); boolean associated = connection.getHttpChannel().associate(exchange);
exchange.requestComplete(); Assert.assertTrue(associated);
exchange.terminateRequest(null); exchange.requestComplete(null);
exchange.terminateRequest();
return exchange; return exchange;
} }

View File

@ -23,6 +23,8 @@ import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpChannel;
import org.eclipse.jetty.client.HttpExchange; import org.eclipse.jetty.client.HttpExchange;
import org.eclipse.jetty.client.HttpReceiver;
import org.eclipse.jetty.client.HttpSender;
import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.client.api.Result;
import org.eclipse.jetty.fcgi.generator.Flusher; import org.eclipse.jetty.fcgi.generator.Flusher;
import org.eclipse.jetty.fcgi.generator.Generator; import org.eclipse.jetty.fcgi.generator.Generator;
@ -58,6 +60,18 @@ public class HttpChannelOverFCGI extends HttpChannel
return request; return request;
} }
@Override
protected HttpSender getHttpSender()
{
return sender;
}
@Override
protected HttpReceiver getHttpReceiver()
{
return receiver;
}
@Override @Override
public void send() public void send()
{ {
@ -70,23 +84,9 @@ public class HttpChannelOverFCGI extends HttpChannel
} }
@Override @Override
public void proceed(HttpExchange exchange, Throwable failure) public void release()
{ {
sender.proceed(exchange, failure); connection.release(this);
}
@Override
public boolean abort(Throwable cause)
{
boolean sendAborted = sender.abort(cause);
boolean receiveAborted = abortResponse(cause);
return sendAborted || receiveAborted;
}
@Override
public boolean abortResponse(Throwable cause)
{
return receiver.abort(cause);
} }
protected boolean responseBegin(int code, String reason) protected boolean responseBegin(int code, String reason)
@ -132,15 +132,15 @@ public class HttpChannelOverFCGI extends HttpChannel
} }
@Override @Override
public void exchangeTerminated(Result result) public void exchangeTerminated(HttpExchange exchange, Result result)
{ {
super.exchangeTerminated(result); super.exchangeTerminated(exchange, result);
idle.onClose(); idle.onClose();
HttpFields responseHeaders = result.getResponse().getHeaders(); HttpFields responseHeaders = result.getResponse().getHeaders();
if (result.isFailed()) if (result.isFailed())
connection.close(result.getFailure()); connection.close(result.getFailure());
else if (!connection.closeByHTTP(responseHeaders)) else if (!connection.closeByHTTP(responseHeaders))
connection.release(this); release();
} }
protected void flush(Generator.Result... results) protected void flush(Generator.Result... results)

View File

@ -286,8 +286,10 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec
int id = acquireRequest(); int id = acquireRequest();
HttpChannelOverFCGI channel = new HttpChannelOverFCGI(HttpConnectionOverFCGI.this, flusher, id, request.getIdleTimeout()); HttpChannelOverFCGI channel = new HttpChannelOverFCGI(HttpConnectionOverFCGI.this, flusher, id, request.getIdleTimeout());
channels.put(id, channel); channels.put(id, channel);
channel.associate(exchange); if (channel.associate(exchange))
channel.send(); channel.send();
else
channel.release();
} }
@Override @Override

View File

@ -64,29 +64,15 @@ public class HttpChannelOverSPDY extends HttpChannel
} }
@Override @Override
public void proceed(HttpExchange exchange, Throwable failure) public void release()
{ {
sender.proceed(exchange, failure);
}
@Override
public boolean abort(Throwable cause)
{
boolean sendAborted = sender.abort(cause);
boolean receiveAborted = abortResponse(cause);
return sendAborted || receiveAborted;
}
@Override
public boolean abortResponse(Throwable cause)
{
return receiver.abort(cause);
}
@Override
public void exchangeTerminated(Result result)
{
super.exchangeTerminated(result);
connection.release(this); connection.release(this);
} }
@Override
public void exchangeTerminated(HttpExchange exchange, Result result)
{
super.exchangeTerminated(exchange, result);
release();
}
} }

View File

@ -48,8 +48,10 @@ public class HttpConnectionOverSPDY extends HttpConnection
// One connection maps to N channels, so for each exchange we create a new channel // One connection maps to N channels, so for each exchange we create a new channel
HttpChannel channel = new HttpChannelOverSPDY(getHttpDestination(), this, session); HttpChannel channel = new HttpChannelOverSPDY(getHttpDestination(), this, session);
channels.add(channel); channels.add(channel);
channel.associate(exchange); if (channel.associate(exchange))
channel.send(); channel.send();
else
channel.release();
} }
protected void release(HttpChannel channel) protected void release(HttpChannel channel)