From 1dc66b72dd3788c1903e55cf1c8afcb2d4f7d343 Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 24 Mar 2015 16:19:45 +0100 Subject: [PATCH] 461499 - ConnectionPool may leak connections. Made associate(), disassociate() and abort() atomic operations using the HttpExchange state to coordinate atomicity. In this way, it's not possible to associate a HttpChannel and a HttpExchange if the latter has been aborted. --- .../org/eclipse/jetty/client/HttpChannel.java | 110 ++++++-- .../eclipse/jetty/client/HttpExchange.java | 262 +++++++++++------- .../eclipse/jetty/client/HttpReceiver.java | 151 +++++----- .../org/eclipse/jetty/client/HttpSender.java | 209 +++++++------- .../client/http/HttpChannelOverHTTP.java | 38 +-- .../client/http/HttpConnectionOverHTTP.java | 6 +- .../jetty/client/HttpResponseAbortTest.java | 82 +++++- .../client/http/HttpReceiverOverHTTPTest.java | 7 +- .../fcgi/client/http/HttpChannelOverFCGI.java | 38 +-- .../client/http/HttpConnectionOverFCGI.java | 6 +- .../spdy/client/http/HttpChannelOverSPDY.java | 30 +- .../client/http/HttpConnectionOverSPDY.java | 6 +- 12 files changed, 580 insertions(+), 365 deletions(-) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java index 74d762e8939..cf18ba24b7a 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpChannel.java @@ -18,74 +18,126 @@ package org.eclipse.jetty.client; -import java.util.concurrent.atomic.AtomicReference; - import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.SpinLock; public abstract class HttpChannel { protected static final Logger LOG = Log.getLogger(HttpChannel.class); - private final AtomicReference exchange = new AtomicReference<>(); - private final HttpDestination destination; + private final SpinLock _lock = new SpinLock(); + private final HttpDestination _destination; + private HttpExchange _exchange; protected HttpChannel(HttpDestination destination) { - this.destination = destination; + this._destination = destination; } public HttpDestination getHttpDestination() { - return destination; + return _destination; } - public void associate(HttpExchange exchange) + /** + *

Associates the given {@code exchange} to this channel in order to be sent over the network.

+ *

If the association is successful, the exchange can be sent. Otherwise, the channel must be + * disposed because whoever terminated the exchange did not do it - it did not have the channel yet.

+ * + * @param exchange the exchange to associate + * @return true if the association was successful, false otherwise + */ + public boolean associate(HttpExchange exchange) { - if (this.exchange.compareAndSet(null, exchange)) + boolean result = false; + boolean abort = true; + try (SpinLock.Lock lock = _lock.lock()) { - exchange.associate(this); - if (LOG.isDebugEnabled()) - LOG.debug("{} associated to {}", exchange, this); + if (_exchange == null) + { + abort = false; + result = exchange.associate(this); + if (result) + _exchange = exchange; + } } - else - { + + if (abort) exchange.getRequest().abort(new UnsupportedOperationException("Pipelined requests not supported")); - } + + if (LOG.isDebugEnabled()) + LOG.debug("{} associated {} to {}", exchange, result, this); + + return result; } - public HttpExchange disassociate() + public boolean disassociate(HttpExchange exchange) { - HttpExchange exchange = this.exchange.getAndSet(null); - if (exchange != null) - exchange.disassociate(this); + boolean result = false; + try (SpinLock.Lock lock = _lock.lock()) + { + HttpExchange existing = _exchange; + _exchange = null; + if (existing == exchange) + { + existing.disassociate(this); + result = true; + } + } if (LOG.isDebugEnabled()) - LOG.debug("{} disassociated from {}", exchange, this); - return exchange; + LOG.debug("{} disassociated {} from {}", exchange, result, this); + return result; } public HttpExchange getHttpExchange() { - return exchange.get(); + try (SpinLock.Lock lock = _lock.lock()) + { + return _exchange; + } } + protected abstract HttpSender getHttpSender(); + + protected abstract HttpReceiver getHttpReceiver(); + public abstract void send(); - public abstract void proceed(HttpExchange exchange, Throwable failure); + public abstract void release(); - public abstract boolean abort(Throwable cause); - - public abstract boolean abortResponse(Throwable cause); - - public void exchangeTerminated(Result result) + public void proceed(HttpExchange exchange, Throwable failure) { - disassociate(); + getHttpSender().proceed(exchange, failure); + } + + public boolean abort(HttpExchange exchange, Throwable requestFailure, Throwable responseFailure) + { + boolean requestAborted = false; + if (requestFailure != null) + requestAborted = getHttpSender().abort(exchange, requestFailure); + + boolean responseAborted = false; + if (responseFailure != null) + responseAborted = abortResponse(exchange, responseFailure); + + return requestAborted || responseAborted; + } + + public boolean abortResponse(HttpExchange exchange, Throwable failure) + { + return getHttpReceiver().abort(exchange, failure); + } + + public void exchangeTerminated(HttpExchange exchange, Result result) + { + disassociate(exchange); } @Override public String toString() { - return String.format("%s@%x(exchange=%s)", getClass().getSimpleName(), hashCode(), exchange); + return String.format("%s@%x(exchange=%s)", getClass().getSimpleName(), hashCode(), getHttpExchange()); } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java index 704884c1dc5..708b26bce56 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java @@ -19,7 +19,6 @@ package org.eclipse.jetty.client; import java.util.List; -import java.util.concurrent.atomic.AtomicReference; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; @@ -31,21 +30,14 @@ public class HttpExchange { private static final Logger LOG = Log.getLogger(HttpExchange.class); - private final AtomicReference channel = new AtomicReference<>(); private final HttpDestination destination; private final HttpRequest request; private final List listeners; private final HttpResponse response; - - enum State - { - PENDING, COMPLETED, TERMINATED - } - - ; private final SpinLock _lock = new SpinLock(); private State requestState = State.PENDING; private State responseState = State.PENDING; + private HttpChannel _channel; private Throwable requestFailure; private Throwable responseFailure; @@ -96,120 +88,187 @@ public class HttpExchange } } - public void associate(HttpChannel channel) + /** + *

Associates the given {@code channel} to this exchange.

+ *

Works in strict collaboration with {@link HttpChannel#associate(HttpExchange)}.

+ * + * @param channel the channel to associate to this exchange + * @return true if the channel could be associated, false otherwise + */ + boolean associate(HttpChannel channel) { - if (!this.channel.compareAndSet(null, channel)) - request.abort(new IllegalStateException()); + boolean result = false; + boolean abort = false; + try (SpinLock.Lock lock = _lock.lock()) + { + // Only associate if the exchange state is initial, + // as the exchange could be already failed. + if (requestState == State.PENDING && responseState == State.PENDING) + { + abort = _channel != null; + if (!abort) + { + _channel = channel; + result = true; + } + } + } + + if (abort) + request.abort(new IllegalStateException(toString())); + + return result; } - public void disassociate(HttpChannel channel) + void disassociate(HttpChannel channel) { - if (!this.channel.compareAndSet(channel, null)) - request.abort(new IllegalStateException()); + boolean abort = false; + try (SpinLock.Lock lock = _lock.lock()) + { + if (_channel != channel || requestState != State.TERMINATED || responseState != State.TERMINATED) + abort = true; + _channel = null; + } + + if (abort) + request.abort(new IllegalStateException(toString())); } - public boolean requestComplete() + private HttpChannel getHttpChannel() { try (SpinLock.Lock lock = _lock.lock()) { - if (requestState != State.PENDING) - return false; + return _channel; + } + } + + public boolean requestComplete(Throwable failure) + { + try (SpinLock.Lock lock = _lock.lock()) + { + return completeRequest(failure); + } + } + + private boolean completeRequest(Throwable failure) + { + if (requestState == State.PENDING) + { requestState = State.COMPLETED; - return true; - } - } - - public boolean responseComplete() - { - try (SpinLock.Lock lock = _lock.lock()) - { - if (responseState != State.PENDING) - return false; - responseState = State.COMPLETED; - return true; - } - } - - public Result terminateRequest(Throwable failure) - { - try (SpinLock.Lock lock = _lock.lock()) - { - requestState = State.TERMINATED; requestFailure = failure; - if (State.TERMINATED.equals(responseState)) - return new Result(getRequest(), requestFailure, getResponse(), responseFailure); + return true; } - return null; + return false; } - public Result terminateResponse(Throwable failure) + public boolean responseComplete(Throwable failure) { try (SpinLock.Lock lock = _lock.lock()) { - responseState = State.TERMINATED; - responseFailure = failure; - if (requestState == State.TERMINATED) - return new Result(getRequest(), requestFailure, getResponse(), responseFailure); + return completeResponse(failure); } - return null; } - - public boolean abort(Throwable cause) + private boolean completeResponse(Throwable failure) { + if (responseState == State.PENDING) + { + responseState = State.COMPLETED; + responseFailure = failure; + return true; + } + return false; + } + + public Result terminateRequest() + { + Result result = null; + try (SpinLock.Lock lock = _lock.lock()) + { + if (requestState == State.COMPLETED) + requestState = State.TERMINATED; + if (requestState == State.TERMINATED && responseState == State.TERMINATED) + result = new Result(getRequest(), requestFailure, getResponse(), responseFailure); + } + + if (LOG.isDebugEnabled()) + LOG.debug("Terminated request for {}, result: {}", this, result); + + return result; + } + + public Result terminateResponse() + { + Result result = null; + try (SpinLock.Lock lock = _lock.lock()) + { + if (responseState == State.COMPLETED) + responseState = State.TERMINATED; + if (requestState == State.TERMINATED && responseState == State.TERMINATED) + result = new Result(getRequest(), requestFailure, getResponse(), responseFailure); + } + + if (LOG.isDebugEnabled()) + LOG.debug("Terminated response for {}, result: {}", this, result); + + return result; + } + + public boolean abort(Throwable failure) + { + // Atomically change the state of this exchange to be completed. + // This will avoid that this exchange can be associated to a channel. + boolean abortRequest; + boolean abortResponse; + try (SpinLock.Lock lock = _lock.lock()) + { + abortRequest = completeRequest(failure); + abortResponse = completeResponse(failure); + } + + if (LOG.isDebugEnabled()) + LOG.debug("Failed {}: req={}/rsp={} {}", this, abortRequest, abortResponse, failure); + + if (!abortRequest && !abortResponse) + return false; + + // We failed this exchange, deal with it. + + // Case #1: exchange was in the destination queue. if (destination.remove(this)) { if (LOG.isDebugEnabled()) - LOG.debug("Aborting while queued {}: {}", this, cause); - return fail(cause); - } - else - { - HttpChannel channel = this.channel.get(); - if (channel == null) - return fail(cause); - - boolean aborted = channel.abort(cause); - if (LOG.isDebugEnabled()) - LOG.debug("Aborted ({}) while active {}: {}", aborted, this, cause); - return aborted; - } - } - - private boolean fail(Throwable cause) - { - boolean notify = false; - try (SpinLock.Lock lock = _lock.lock()) - { - if (requestState != State.TERMINATED) - { - requestState = State.TERMINATED; - notify = true; - requestFailure = cause; - } - if (responseState != State.TERMINATED) - { - responseState = State.TERMINATED; - notify = true; - responseFailure = cause; - } - } - - if (notify) - { - if (LOG.isDebugEnabled()) - LOG.debug("Failing {}: {}", this, cause); - destination.getRequestNotifier().notifyFailure(request, cause); - List listeners = getConversation().getResponseListeners(); - ResponseNotifier responseNotifier = destination.getResponseNotifier(); - responseNotifier.notifyFailure(listeners, response, cause); - responseNotifier.notifyComplete(listeners, new Result(request, cause, response, cause)); + LOG.debug("Aborting while queued {}: {}", this, failure); + notifyFailureComplete(failure); return true; } - else + + HttpChannel channel = getHttpChannel(); + if (channel == null) { - return false; + // Case #2: exchange was not yet associated. + // Because this exchange is failed, when associate() is called + // it will return false, and the caller will dispose the channel. + if (LOG.isDebugEnabled()) + LOG.debug("Aborted before association {}: {}", this, failure); + notifyFailureComplete(failure); + return true; } + + // Case #3: exchange was already associated. + boolean aborted = channel.abort(this, abortRequest ? failure : null, abortResponse ? failure : null); + if (LOG.isDebugEnabled()) + LOG.debug("Aborted ({}) while active {}: {}", aborted, this, failure); + return aborted; + } + + private void notifyFailureComplete(Throwable failure) + { + destination.getRequestNotifier().notifyFailure(request, failure); + List listeners = getConversation().getResponseListeners(); + ResponseNotifier responseNotifier = destination.getResponseNotifier(); + responseNotifier.notifyFailure(listeners, response, failure); + responseNotifier.notifyComplete(listeners, new Result(request, failure, response, failure)); } public void resetResponse() @@ -223,7 +282,7 @@ public class HttpExchange public void proceed(Throwable failure) { - HttpChannel channel = this.channel.get(); + HttpChannel channel = getHttpChannel(); if (channel != null) channel.proceed(this, failure); } @@ -233,11 +292,16 @@ public class HttpExchange { try (SpinLock.Lock lock = _lock.lock()) { - return String.format("%s@%x req=%s/%s res=%s/%s", + return String.format("%s@%x req=%s/%s@%h res=%s/%s@%h", HttpExchange.class.getSimpleName(), hashCode(), - requestState, requestFailure, - responseState, responseFailure); + requestState, requestFailure, requestFailure, + responseState, responseFailure, responseFailure); } } + + private enum State + { + PENDING, COMPLETED, TERMINATED + } } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index 10555f9a562..8b5b9fb68d6 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -129,10 +129,11 @@ public abstract class HttpReceiver ResponseNotifier notifier = destination.getResponseNotifier(); notifier.notifyBegin(conversation.getResponseListeners(), response); - if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN)) - terminateResponse(exchange, failure); + if (updateResponseState(ResponseState.TRANSIENT, ResponseState.BEGIN)) + return true; - return true; + terminateResponse(exchange); + return false; } /** @@ -193,10 +194,11 @@ public abstract class HttpReceiver } } - if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER)) - terminateResponse(exchange, failure); + if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADER)) + return true; - return true; + terminateResponse(exchange); + return false; } protected void storeCookie(URI uri, HttpField field) @@ -269,10 +271,11 @@ public abstract class HttpReceiver } } - if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS)) - terminateResponse(exchange, failure); + if (updateResponseState(ResponseState.TRANSIENT, ResponseState.HEADERS)) + return true; - return true; + terminateResponse(exchange); + return false; } /** @@ -343,10 +346,11 @@ public abstract class HttpReceiver } } - if (!updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) - terminateResponse(exchange, failure); + if (updateResponseState(ResponseState.TRANSIENT, ResponseState.CONTENT)) + return true; - return true; + terminateResponse(exchange); + return false; } /** @@ -362,7 +366,7 @@ public abstract class HttpReceiver { // Mark atomically the response as completed, with respect // to concurrency between response success and response failure. - boolean completed = exchange.responseComplete(); + boolean completed = exchange.responseComplete(null); if (!completed) return false; @@ -371,12 +375,13 @@ public abstract class HttpReceiver // Reset to be ready for another response. reset(); - // Mark atomically the response as terminated and succeeded, - // with respect to concurrency between request and response. - Result result = exchange.terminateResponse(null); + // Mark atomically the response as terminated, with + // respect to concurrency between request and response. + Result result = exchange.terminateResponse(); - // It is important to notify *after* we reset and terminate - // because the notification may trigger another request/response. + // Notify *after* resetting and terminating, because + // the notification may trigger another request/response, + // or the reset of the response in case of 100-Continue. HttpResponse response = exchange.getResponse(); if (LOG.isDebugEnabled()) LOG.debug("Response success {}", response); @@ -409,60 +414,15 @@ public abstract class HttpReceiver // Mark atomically the response as completed, with respect // to concurrency between response success and response failure. - boolean completed = exchange.responseComplete(); - if (!completed) - return false; + if (exchange.responseComplete(failure)) + return abort(exchange, failure); - this.failure = failure; - - // Update the state to avoid more response processing. - boolean fail; - while (true) - { - ResponseState current = responseState.get(); - if (updateResponseState(current, ResponseState.FAILURE)) - { - fail = current != ResponseState.TRANSIENT; - break; - } - } - - dispose(); - - Result result = failResponse(exchange, failure); - - if (fail) - { - terminateResponse(exchange, result); - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("Concurrent failure: response termination skipped, performed by helpers"); - } - - return true; + return false; } - private Result failResponse(HttpExchange exchange, Throwable failure) + private void terminateResponse(HttpExchange exchange) { - // Mark atomically the response as terminated and failed, - // with respect to concurrency between request and response. - Result result = exchange.terminateResponse(failure); - - HttpResponse response = exchange.getResponse(); - if (LOG.isDebugEnabled()) - LOG.debug("Response failure {} {} on {}: {}", response, exchange, getHttpChannel(), failure); - List listeners = exchange.getConversation().getResponseListeners(); - ResponseNotifier notifier = getHttpDestination().getResponseNotifier(); - notifier.notifyFailure(listeners, response, failure); - - return result; - } - - private void terminateResponse(HttpExchange exchange, Throwable failure) - { - Result result = failResponse(exchange, failure); + Result result = exchange.terminateResponse(); terminateResponse(exchange, result); } @@ -477,14 +437,14 @@ public abstract class HttpReceiver { boolean ordered = getHttpDestination().getHttpClient().isStrictEventOrdering(); if (!ordered) - channel.exchangeTerminated(result); + channel.exchangeTerminated(exchange, result); if (LOG.isDebugEnabled()) LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result); List listeners = exchange.getConversation().getResponseListeners(); ResponseNotifier notifier = getHttpDestination().getResponseNotifier(); notifier.notifyComplete(listeners, result); if (ordered) - channel.exchangeTerminated(result); + channel.exchangeTerminated(exchange, result); } } @@ -512,9 +472,56 @@ public abstract class HttpReceiver decoder = null; } - public boolean abort(Throwable cause) + public boolean abort(HttpExchange exchange, Throwable failure) { - return responseFailure(cause); + // Update the state to avoid more response processing. + boolean terminate; + out: while (true) + { + ResponseState current = responseState.get(); + switch (current) + { + case FAILURE: + { + return false; + } + default: + { + if (updateResponseState(current, ResponseState.FAILURE)) + { + terminate = current != ResponseState.TRANSIENT; + break out; + } + break; + } + } + } + + this.failure = failure; + + dispose(); + + HttpResponse response = exchange.getResponse(); + if (LOG.isDebugEnabled()) + LOG.debug("Response failure {} {} on {}: {}", response, exchange, getHttpChannel(), failure); + List listeners = exchange.getConversation().getResponseListeners(); + ResponseNotifier notifier = getHttpDestination().getResponseNotifier(); + notifier.notifyFailure(listeners, response, failure); + + if (terminate) + { + // Mark atomically the response as terminated, with + // respect to concurrency between request and response. + Result result = exchange.terminateResponse(); + terminateResponse(exchange, result); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Concurrent failure: response termination skipped, performed by helpers"); + } + + return true; } private boolean updateResponseState(ResponseState from, ResponseState to) diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java index 8b515344d61..a7b68e58408 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpSender.java @@ -162,10 +162,10 @@ public abstract class HttpSender implements AsyncContentProvider.Listener public void send(HttpExchange exchange) { - Request request = exchange.getRequest(); - if (!queuedToBegin(request)) + if (!queuedToBegin(exchange)) return; + Request request = exchange.getRequest(); ContentProvider contentProvider = request.getContent(); HttpContent content = this.content = new HttpContent(contentProvider); @@ -198,7 +198,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener if (contentProvider instanceof AsyncContentProvider) ((AsyncContentProvider)contentProvider).setListener(this); - if (!beginToHeaders(request)) + if (!beginToHeaders(exchange)) return; sendHeaders(exchange, content, commitCallback); @@ -209,46 +209,61 @@ public abstract class HttpSender implements AsyncContentProvider.Listener return request.getHeaders().contains(HttpHeader.EXPECT, HttpHeaderValue.CONTINUE.asString()); } - protected boolean queuedToBegin(Request request) + protected boolean queuedToBegin(HttpExchange exchange) { if (!updateRequestState(RequestState.QUEUED, RequestState.TRANSIENT)) return false; + + Request request = exchange.getRequest(); if (LOG.isDebugEnabled()) LOG.debug("Request begin {}", request); RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier(); notifier.notifyBegin(request); - if (!updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN)) - terminateRequest(getHttpExchange(), failure); - return true; + + if (updateRequestState(RequestState.TRANSIENT, RequestState.BEGIN)) + return true; + + terminateRequest(exchange); + return false; } - protected boolean beginToHeaders(Request request) + protected boolean beginToHeaders(HttpExchange exchange) { if (!updateRequestState(RequestState.BEGIN, RequestState.TRANSIENT)) return false; + + Request request = exchange.getRequest(); if (LOG.isDebugEnabled()) LOG.debug("Request headers {}{}{}", request, System.lineSeparator(), request.getHeaders().toString().trim()); RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier(); notifier.notifyHeaders(request); - if (!updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS)) - terminateRequest(getHttpExchange(), failure); - return true; + + if (updateRequestState(RequestState.TRANSIENT, RequestState.HEADERS)) + return true; + + terminateRequest(exchange); + return false; } - protected boolean headersToCommit(Request request) + protected boolean headersToCommit(HttpExchange exchange) { if (!updateRequestState(RequestState.HEADERS, RequestState.TRANSIENT)) return false; + + Request request = exchange.getRequest(); if (LOG.isDebugEnabled()) LOG.debug("Request committed {}", request); RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier(); notifier.notifyCommit(request); - if (!updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT)) - terminateRequest(getHttpExchange(), failure); - return true; + + if (updateRequestState(RequestState.TRANSIENT, RequestState.COMMIT)) + return true; + + terminateRequest(exchange); + return false; } - protected boolean someToContent(Request request, ByteBuffer content) + protected boolean someToContent(HttpExchange exchange, ByteBuffer content) { RequestState current = requestState.get(); switch (current) @@ -256,15 +271,20 @@ public abstract class HttpSender implements AsyncContentProvider.Listener case COMMIT: case CONTENT: { - if (!updateRequestState(current, RequestState.TRANSIENT_CONTENT)) + if (!updateRequestState(current, RequestState.TRANSIENT)) return false; + + Request request = exchange.getRequest(); if (LOG.isDebugEnabled()) LOG.debug("Request content {}{}{}", request, System.lineSeparator(), BufferUtil.toDetailString(content)); RequestNotifier notifier = getHttpChannel().getHttpDestination().getRequestNotifier(); notifier.notifyContent(request, content); - if (!updateRequestState(RequestState.TRANSIENT_CONTENT, RequestState.CONTENT)) - terminateRequest(getHttpExchange(), failure); - return true; + + if (updateRequestState(RequestState.TRANSIENT, RequestState.CONTENT)) + return true; + + terminateRequest(exchange); + return false; } default: { @@ -283,7 +303,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener { // Mark atomically the request as completed, with respect // to concurrency between request success and request failure. - boolean completed = exchange.requestComplete(); + boolean completed = exchange.requestComplete(null); if (!completed) return false; @@ -292,18 +312,16 @@ public abstract class HttpSender implements AsyncContentProvider.Listener // Reset to be ready for another request. reset(); - // Mark atomically the request as terminated and succeeded, - // with respect to concurrency between request and response. - Result result = exchange.terminateRequest(null); - Request request = exchange.getRequest(); if (LOG.isDebugEnabled()) LOG.debug("Request success {}", request); HttpDestination destination = getHttpChannel().getHttpDestination(); destination.getRequestNotifier().notifySuccess(exchange.getRequest()); + // Mark atomically the request as terminated, with + // respect to concurrency between request and response. + Result result = exchange.terminateRequest(); terminateRequest(exchange, null, result); - return true; } default: @@ -321,64 +339,21 @@ public abstract class HttpSender implements AsyncContentProvider.Listener // Mark atomically the request as completed, with respect // to concurrency between request success and request failure. - boolean completed = exchange.requestComplete(); - if (!completed) - return false; + if (exchange.requestComplete(failure)) + return abort(exchange, failure); - this.failure = failure; - - // Update the state to avoid more request processing. - RequestState current; - boolean fail; - while (true) - { - current = requestState.get(); - if (updateRequestState(current, RequestState.FAILURE)) - { - fail = current != RequestState.TRANSIENT && current != RequestState.TRANSIENT_CONTENT; - break; - } - } - - dispose(); - - Result result = failRequest(exchange, failure); - - if (fail) - { - terminateRequest(exchange, failure, result); - } - else - { - if (LOG.isDebugEnabled()) - LOG.debug("Concurrent failure: request termination skipped, performed by helpers"); - } - - return true; + return false; } - private Result failRequest(HttpExchange exchange, Throwable failure) + private void terminateRequest(HttpExchange exchange) { - // Mark atomically the request as terminated and failed, - // with respect to concurrency between request and response. - Result result = exchange.terminateRequest(failure); - - Request request = exchange.getRequest(); - if (LOG.isDebugEnabled()) - LOG.debug("Request failure {} {} on {}: {}", request, exchange, getHttpChannel(), failure); - HttpDestination destination = getHttpChannel().getHttpDestination(); - destination.getRequestNotifier().notifyFailure(request, failure); - - return result; - } - - private void terminateRequest(HttpExchange exchange, Throwable failure) - { - if (exchange != null) - { - Result result = failRequest(exchange, failure); - terminateRequest(exchange, failure, result); - } + // In abort(), the state is updated before the failure is recorded + // to avoid to overwrite it, so here we may read a null failure. + Throwable failure = this.failure; + if (failure == null) + failure = new HttpRequestException("Concurrent failure", exchange.getRequest()); + Result result = exchange.terminateRequest(); + terminateRequest(exchange, failure, result); } private void terminateRequest(HttpExchange exchange, Throwable failure, Result result) @@ -392,9 +367,12 @@ public abstract class HttpSender implements AsyncContentProvider.Listener { if (failure != null) { - if (LOG.isDebugEnabled()) - LOG.debug("Response failure from request {} {}", request, exchange); - getHttpChannel().abortResponse(failure); + if (exchange.responseComplete(failure)) + { + if (LOG.isDebugEnabled()) + LOG.debug("Response failure from request {} {}", request, exchange); + getHttpChannel().abortResponse(exchange, failure); + } } } else @@ -402,13 +380,13 @@ public abstract class HttpSender implements AsyncContentProvider.Listener HttpDestination destination = getHttpChannel().getHttpDestination(); boolean ordered = destination.getHttpClient().isStrictEventOrdering(); if (!ordered) - channel.exchangeTerminated(result); + channel.exchangeTerminated(exchange, result); if (LOG.isDebugEnabled()) LOG.debug("Request/Response {}: {}", failure == null ? "succeeded" : "failed", result); HttpConversation conversation = exchange.getConversation(); destination.getResponseNotifier().notifyComplete(conversation.getResponseListeners(), result); if (ordered) - channel.exchangeTerminated(result); + channel.exchangeTerminated(exchange, result); } } @@ -528,9 +506,55 @@ public abstract class HttpSender implements AsyncContentProvider.Listener } } - public boolean abort(Throwable failure) + public boolean abort(HttpExchange exchange, Throwable failure) { - return anyToFailure(failure); + // Update the state to avoid more request processing. + boolean terminate; + out: while (true) + { + RequestState current = requestState.get(); + switch (current) + { + case FAILURE: + { + return false; + } + default: + { + if (updateRequestState(current, RequestState.FAILURE)) + { + terminate = current != RequestState.TRANSIENT; + break out; + } + break; + } + } + } + + this.failure = failure; + + dispose(); + + Request request = exchange.getRequest(); + if (LOG.isDebugEnabled()) + LOG.debug("Request failure {} {} on {}: {}", request, exchange, getHttpChannel(), failure); + HttpDestination destination = getHttpChannel().getHttpDestination(); + destination.getRequestNotifier().notifyFailure(request, failure); + + if (terminate) + { + // Mark atomically the request as terminated, with + // respect to concurrency between request and response. + Result result = exchange.terminateRequest(); + terminateRequest(exchange, failure, result); + } + else + { + if (LOG.isDebugEnabled()) + LOG.debug("Concurrent failure: request termination skipped, performed by helpers"); + } + + return true; } private boolean updateRequestState(RequestState from, RequestState to) @@ -574,10 +598,6 @@ public abstract class HttpSender implements AsyncContentProvider.Listener * One of the state transition methods is being executed. */ TRANSIENT, - /** - * The content transition method is being executed. - */ - TRANSIENT_CONTENT, /** * The request is queued, the initial state */ @@ -686,8 +706,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener if (exchange == null) return; - Request request = exchange.getRequest(); - if (!headersToCommit(request)) + if (!headersToCommit(exchange)) return; HttpContent content = HttpSender.this.content; @@ -705,7 +724,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener ByteBuffer contentBuffer = content.getContent(); if (contentBuffer != null) { - if (!someToContent(request, contentBuffer)) + if (!someToContent(exchange, contentBuffer)) return; } @@ -840,7 +859,7 @@ public abstract class HttpSender implements AsyncContentProvider.Listener return; content.succeeded(); ByteBuffer buffer = content.getContent(); - someToContent(exchange.getRequest(), buffer); + someToContent(exchange, buffer); super.succeeded(); } diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpChannelOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpChannelOverHTTP.java index 1b3367bb9ea..6578c4438ea 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpChannelOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpChannelOverHTTP.java @@ -20,6 +20,8 @@ package org.eclipse.jetty.client.http; import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpExchange; +import org.eclipse.jetty.client.HttpReceiver; +import org.eclipse.jetty.client.HttpSender; import org.eclipse.jetty.client.api.Response; import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.http.HttpFields; @@ -51,6 +53,18 @@ public class HttpChannelOverHTTP extends HttpChannel return new HttpReceiverOverHTTP(this); } + @Override + protected HttpSender getHttpSender() + { + return sender; + } + + @Override + protected HttpReceiver getHttpReceiver() + { + return receiver; + } + public HttpConnectionOverHTTP getHttpConnection() { return connection; @@ -65,23 +79,9 @@ public class HttpChannelOverHTTP extends HttpChannel } @Override - public void proceed(HttpExchange exchange, Throwable failure) + public void release() { - sender.proceed(exchange, failure); - } - - @Override - public boolean abort(Throwable cause) - { - boolean sendAborted = sender.abort(cause); - boolean receiveAborted = abortResponse(cause); - return sendAborted || receiveAborted; - } - - @Override - public boolean abortResponse(Throwable cause) - { - return receiver.abort(cause); + connection.release(); } public void receive() @@ -90,9 +90,9 @@ public class HttpChannelOverHTTP extends HttpChannel } @Override - public void exchangeTerminated(Result result) + public void exchangeTerminated(HttpExchange exchange, Result result) { - super.exchangeTerminated(result); + super.exchangeTerminated(exchange, result); Response response = result.getResponse(); HttpFields responseHeaders = response.getHeaders(); @@ -115,7 +115,7 @@ public class HttpChannelOverHTTP extends HttpChannel if (close) connection.close(); else - connection.release(); + release(); } @Override diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java index 62aa17ca471..28c4c2ecd90 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/http/HttpConnectionOverHTTP.java @@ -200,8 +200,10 @@ public class HttpConnectionOverHTTP extends AbstractConnection implements Connec endPoint.setIdleTimeout(request.getIdleTimeout()); // One channel per connection, just delegate the send - channel.associate(exchange); - channel.send(); + if (channel.associate(exchange)) + channel.send(); + else + channel.release(); } @Override diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java index 304d818e966..7a22d1253c7 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/HttpResponseAbortTest.java @@ -23,7 +23,7 @@ import java.io.OutputStream; import java.nio.ByteBuffer; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; - +import java.util.concurrent.atomic.AtomicInteger; import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -176,4 +176,84 @@ public class HttpResponseAbortTest extends AbstractHttpClientServerTest }); Assert.assertTrue(latch.await(5, TimeUnit.SECONDS)); } + + @Test + public void testAbortOnContentBeforeRequestTermination() throws Exception + { + start(new AbstractHandler() + { + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + try + { + baseRequest.setHandled(true); + OutputStream output = response.getOutputStream(); + output.write(1); + output.flush(); + output.write(2); + output.flush(); + } + catch (IOException ignored) + { + // The client may have already closed, and we'll get an exception here, but it's expected + } + } + }); + + final CountDownLatch abortLatch = new CountDownLatch(1); + final AtomicInteger completes = new AtomicInteger(); + final CountDownLatch completeLatch = new CountDownLatch(1); + client.newRequest("localhost", connector.getLocalPort()) + .scheme(scheme) + .onRequestSuccess(new org.eclipse.jetty.client.api.Request.SuccessListener() + { + @Override + public void onSuccess(org.eclipse.jetty.client.api.Request request) + { + try + { + abortLatch.await(5, TimeUnit.SECONDS); + } + catch (InterruptedException x) + { + x.printStackTrace(); + } + } + }) + .onResponseContent(new Response.ContentListener() + { + @Override + public void onContent(Response response, ByteBuffer content) + { + try + { + response.abort(new Exception()); + abortLatch.countDown(); + // Delay to let the request side to finish its processing. + Thread.sleep(1000); + } + catch (InterruptedException x) + { + x.printStackTrace(); + } + } + }) + .send(new Response.CompleteListener() + { + @Override + public void onComplete(Result result) + { + completes.incrementAndGet(); + Assert.assertTrue(result.isFailed()); + completeLatch.countDown(); + } + }); + Assert.assertTrue(completeLatch.await(5, TimeUnit.SECONDS)); + + // Wait to be sure that the complete event is only notified once. + Thread.sleep(1000); + + Assert.assertEquals(1, completes.get()); + } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java index 717cffc1f7e..6894841fa5e 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/http/HttpReceiverOverHTTPTest.java @@ -74,9 +74,10 @@ public class HttpReceiverOverHTTPTest HttpRequest request = (HttpRequest)client.newRequest("http://localhost"); FutureResponseListener listener = new FutureResponseListener(request); HttpExchange exchange = new HttpExchange(destination, request, Collections.singletonList(listener)); - connection.getHttpChannel().associate(exchange); - exchange.requestComplete(); - exchange.terminateRequest(null); + boolean associated = connection.getHttpChannel().associate(exchange); + Assert.assertTrue(associated); + exchange.requestComplete(null); + exchange.terminateRequest(); return exchange; } diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java index e9a9ef43557..fb47ec7ef4c 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpChannelOverFCGI.java @@ -23,6 +23,8 @@ import java.util.concurrent.TimeoutException; import org.eclipse.jetty.client.HttpChannel; import org.eclipse.jetty.client.HttpExchange; +import org.eclipse.jetty.client.HttpReceiver; +import org.eclipse.jetty.client.HttpSender; import org.eclipse.jetty.client.api.Result; import org.eclipse.jetty.fcgi.generator.Flusher; import org.eclipse.jetty.fcgi.generator.Generator; @@ -58,6 +60,18 @@ public class HttpChannelOverFCGI extends HttpChannel return request; } + @Override + protected HttpSender getHttpSender() + { + return sender; + } + + @Override + protected HttpReceiver getHttpReceiver() + { + return receiver; + } + @Override public void send() { @@ -70,23 +84,9 @@ public class HttpChannelOverFCGI extends HttpChannel } @Override - public void proceed(HttpExchange exchange, Throwable failure) + public void release() { - sender.proceed(exchange, failure); - } - - @Override - public boolean abort(Throwable cause) - { - boolean sendAborted = sender.abort(cause); - boolean receiveAborted = abortResponse(cause); - return sendAborted || receiveAborted; - } - - @Override - public boolean abortResponse(Throwable cause) - { - return receiver.abort(cause); + connection.release(this); } protected boolean responseBegin(int code, String reason) @@ -132,15 +132,15 @@ public class HttpChannelOverFCGI extends HttpChannel } @Override - public void exchangeTerminated(Result result) + public void exchangeTerminated(HttpExchange exchange, Result result) { - super.exchangeTerminated(result); + super.exchangeTerminated(exchange, result); idle.onClose(); HttpFields responseHeaders = result.getResponse().getHeaders(); if (result.isFailed()) connection.close(result.getFailure()); else if (!connection.closeByHTTP(responseHeaders)) - connection.release(this); + release(); } protected void flush(Generator.Result... results) diff --git a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java index 14cf7d56ecd..056098022be 100644 --- a/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java +++ b/jetty-fcgi/fcgi-client/src/main/java/org/eclipse/jetty/fcgi/client/http/HttpConnectionOverFCGI.java @@ -286,8 +286,10 @@ public class HttpConnectionOverFCGI extends AbstractConnection implements Connec int id = acquireRequest(); HttpChannelOverFCGI channel = new HttpChannelOverFCGI(HttpConnectionOverFCGI.this, flusher, id, request.getIdleTimeout()); channels.put(id, channel); - channel.associate(exchange); - channel.send(); + if (channel.associate(exchange)) + channel.send(); + else + channel.release(); } @Override diff --git a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpChannelOverSPDY.java b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpChannelOverSPDY.java index 2ef43b75bf8..9fa4ac7c2e8 100644 --- a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpChannelOverSPDY.java +++ b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpChannelOverSPDY.java @@ -64,29 +64,15 @@ public class HttpChannelOverSPDY extends HttpChannel } @Override - public void proceed(HttpExchange exchange, Throwable failure) + public void release() { - sender.proceed(exchange, failure); - } - - @Override - public boolean abort(Throwable cause) - { - boolean sendAborted = sender.abort(cause); - boolean receiveAborted = abortResponse(cause); - return sendAborted || receiveAborted; - } - - @Override - public boolean abortResponse(Throwable cause) - { - return receiver.abort(cause); - } - - @Override - public void exchangeTerminated(Result result) - { - super.exchangeTerminated(result); connection.release(this); } + + @Override + public void exchangeTerminated(HttpExchange exchange, Result result) + { + super.exchangeTerminated(exchange, result); + release(); + } } diff --git a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpConnectionOverSPDY.java b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpConnectionOverSPDY.java index 4adc5b367ea..a3902d302f9 100644 --- a/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpConnectionOverSPDY.java +++ b/jetty-spdy/spdy-http-client-transport/src/main/java/org/eclipse/jetty/spdy/client/http/HttpConnectionOverSPDY.java @@ -48,8 +48,10 @@ public class HttpConnectionOverSPDY extends HttpConnection // One connection maps to N channels, so for each exchange we create a new channel HttpChannel channel = new HttpChannelOverSPDY(getHttpDestination(), this, session); channels.add(channel); - channel.associate(exchange); - channel.send(); + if (channel.associate(exchange)) + channel.send(); + else + channel.release(); } protected void release(HttpChannel channel)