diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java index b91652fe517..0106dbb4df8 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpExchange.java @@ -244,7 +244,7 @@ public class HttpExchange implements CyclicTimeouts.Expirable } if (LOG.isDebugEnabled()) - LOG.debug("Failed {}: req={}/rsp={} {}", this, abortRequest, abortResponse, failure); + LOG.debug("Failed {}: req={}/rsp={}", this, abortRequest, abortResponse, failure); if (!abortRequest && !abortResponse) { diff --git a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java index a331eb81371..588734af83c 100644 --- a/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java +++ b/jetty-core/jetty-client/src/main/java/org/eclipse/jetty/client/HttpReceiver.java @@ -31,6 +31,7 @@ import org.eclipse.jetty.io.Content; import org.eclipse.jetty.io.content.ContentSourceTransformer; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.thread.AutoLock; import org.eclipse.jetty.util.thread.SerializedInvoker; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -266,6 +267,9 @@ public abstract class HttpReceiver List responseListeners = exchange.getConversation().getResponseListeners(); notifier.notifyHeaders(responseListeners, response); + if (exchange.isResponseComplete()) + return; + if (HttpStatus.isInterim(response.getStatus())) { if (LOG.isDebugEnabled()) @@ -338,24 +342,18 @@ public abstract class HttpReceiver if (LOG.isDebugEnabled()) LOG.debug("Invoking responseSuccess on {}", this); - NotifiableContentSource contentSource = this.contentSource; - if (contentSource != null) - { - this.contentSource = null; - contentSource.eof(); - } + // Mark atomically the response as completed, with respect + // to concurrency between response success and response failure. + if (!exchange.responseComplete(null)) + return; invoker.run(() -> { if (LOG.isDebugEnabled()) LOG.debug("Executing responseSuccess on {}", this); - // Mark atomically the response as completed, with respect - // to concurrency between response success and response failure. - if (!exchange.responseComplete(null)) - return; - responseState = ResponseState.IDLE; + reset(); HttpResponse response = exchange.getResponse(); @@ -385,34 +383,26 @@ public abstract class HttpReceiver protected void responseFailure(Throwable failure, Promise promise) { if (LOG.isDebugEnabled()) - LOG.debug("Invoking responseFailure with {} on {}", failure, this); + LOG.debug("Failing with {} on {}", failure, this); - invoker.run(() -> + HttpExchange exchange = getHttpExchange(); + // In case of a response error, the failure has already been notified + // and it is possible that a further attempt to read in the receive + // loop throws an exception that reenters here but without exchange; + // or, the server could just have timed out the connection. + if (exchange == null) { - if (LOG.isDebugEnabled()) - LOG.debug("Executing responseFailure on {}", this); + promise.succeeded(false); + return; + } - HttpExchange exchange = getHttpExchange(); - // In case of a response error, the failure has already been notified - // and it is possible that a further attempt to read in the receive - // loop throws an exception that reenters here but without exchange; - // or, the server could just have timed out the connection. - if (exchange == null) - { - promise.succeeded(false); - return; - } - - if (LOG.isDebugEnabled()) - LOG.debug("Response failure {}", exchange.getResponse(), failure); - - // Mark atomically the response as completed, with respect - // to concurrency between response success and response failure. - if (exchange.responseComplete(failure)) - abort(exchange, failure, promise); - else - promise.succeeded(false); - }); + // Mark atomically the response as completed, with respect + // to concurrency between response success and response failure. + boolean completed = exchange.responseComplete(failure); + if (completed) + abort(exchange, failure, promise); + else + promise.succeeded(false); } private void terminateResponse(HttpExchange exchange) @@ -482,10 +472,14 @@ public abstract class HttpReceiver if (LOG.isDebugEnabled()) LOG.debug("Invoking abort with {} on {}", failure, this); + // This method should be called only after calling HttpExchange.responseComplete(). + if (!exchange.isResponseComplete()) + throw new IllegalStateException(); + invoker.run(() -> { if (LOG.isDebugEnabled()) - LOG.debug("Executing abort on {}", this); + LOG.debug("Executing abort with {} on {}", failure, this); if (responseState == ResponseState.FAILURE) { @@ -496,7 +490,7 @@ public abstract class HttpReceiver responseState = ResponseState.FAILURE; this.failure = failure; if (contentSource != null) - contentSource.fail(failure); + contentSource.error(failure); dispose(); HttpResponse response = exchange.getResponse(); @@ -557,7 +551,7 @@ public abstract class HttpReceiver private interface NotifiableContentSource extends Content.Source { - void eof(); + boolean error(Throwable failure); void onDataAvailable(); } @@ -577,12 +571,6 @@ public abstract class HttpReceiver _decoder = decoder; } - @Override - public void eof() - { - _rawSource.eof(); - } - @Override public void onDataAvailable() { @@ -643,6 +631,15 @@ public abstract class HttpReceiver } } } + + @Override + public boolean error(Throwable failure) + { + if (_chunk != null) + _chunk.release(); + _chunk = null; + return _rawSource.error(failure); + } } /** @@ -654,42 +651,37 @@ public abstract class HttpReceiver private static final Logger LOG = LoggerFactory.getLogger(ContentSource.class); private final AtomicReference demandCallbackRef = new AtomicReference<>(); - private volatile Content.Chunk currentChunk; + private final AutoLock lock = new AutoLock(); + private Content.Chunk currentChunk; @Override public Content.Chunk read() { if (LOG.isDebugEnabled()) LOG.debug("Reading from {}", this); - Content.Chunk chunk = consumeCurrentChunk(); - if (chunk != null) - return chunk; - currentChunk = HttpReceiver.this.read(false); - return consumeCurrentChunk(); - } - @Override - public void eof() - { - if (LOG.isDebugEnabled()) - LOG.debug("Setting EOF on {}", this); - if (currentChunk != null) - throw new IllegalStateException(); - currentChunk = Content.Chunk.EOF; - - Runnable demandCallback = demandCallbackRef.getAndSet(null); - if (LOG.isDebugEnabled()) - LOG.debug("Calling demand callback on {}", this); - if (demandCallback != null) + Content.Chunk current; + try (AutoLock ignored = lock.lock()) { - try + current = currentChunk; + currentChunk = Content.Chunk.next(current); + if (current != null) + return current; + } + + current = HttpReceiver.this.read(false); + + try (AutoLock ignored = lock.lock()) + { + if (currentChunk != null) { - demandCallback.run(); - } - catch (Throwable x) - { - fail(x); + // There was a concurrent call to fail(). + if (current != null) + current.release(); + return currentChunk; } + currentChunk = Content.Chunk.next(current); + return current; } } @@ -703,15 +695,6 @@ public abstract class HttpReceiver invokeDemandCallback(true); } - private Content.Chunk consumeCurrentChunk() - { - if (LOG.isDebugEnabled()) - LOG.debug("Consuming current chunk from {}", this); - Content.Chunk chunk = currentChunk; - currentChunk = Content.Chunk.next(chunk); - return chunk; - } - @Override public void demand(Runnable demandCallback) { @@ -731,12 +714,30 @@ public abstract class HttpReceiver if (LOG.isDebugEnabled()) LOG.debug("Processing demand on {}", this); - if (currentChunk == null) + Content.Chunk current; + try (AutoLock ignored = lock.lock()) { - currentChunk = HttpReceiver.this.read(true); - if (currentChunk == null) - return; + current = currentChunk; } + + if (current == null) + { + current = HttpReceiver.this.read(true); + if (current == null) + return; + + try (AutoLock ignored = lock.lock()) + { + if (currentChunk != null) + { + // There was a concurrent call to fail(). + current.release(); + return; + } + currentChunk = current; + } + } + // The processDemand method is only ever called by the // invoker so there is no need to use the latter here. invokeDemandCallback(false); @@ -768,17 +769,40 @@ public abstract class HttpReceiver { if (LOG.isDebugEnabled()) LOG.debug("Failing {}", this); - if (currentChunk != null) - currentChunk.release(); - if (currentChunk == null || !(currentChunk instanceof Content.Chunk.Error)) + boolean failed = error(failure); + if (failed) HttpReceiver.this.failAndClose(failure); - currentChunk = Content.Chunk.from(failure); + invokeDemandCallback(true); + } + + @Override + public boolean error(Throwable failure) + { + if (LOG.isDebugEnabled()) + LOG.debug("Erroring {}", this); + try (AutoLock ignored = lock.lock()) + { + if (currentChunk instanceof Content.Chunk.Error) + return false; + if (currentChunk != null) + currentChunk.release(); + currentChunk = Content.Chunk.from(failure); + } + return true; + } + + private Content.Chunk chunk() + { + try (AutoLock ignored = lock.lock()) + { + return currentChunk; + } } @Override public String toString() { - return String.format("%s@%x{c=%s,d=%s}", getClass().getSimpleName(), hashCode(), currentChunk, demandCallbackRef); + return String.format("%s@%x{c=%s,d=%s}", getClass().getSimpleName(), hashCode(), chunk(), demandCallbackRef); } } } diff --git a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java index 0ca4f18051e..9b343cbe3af 100644 --- a/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java +++ b/jetty-core/jetty-http2/jetty-http2-client-transport/src/main/java/org/eclipse/jetty/http2/client/transport/internal/HttpReceiverOverHTTP2.java @@ -92,13 +92,9 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel. Stream stream = getHttpChannel().getStream(); responseFailure(failure, Promise.from(failed -> { - stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); - getHttpChannel().getHttpConnection().close(failure); - }, x -> - { - stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); - getHttpChannel().getHttpConnection().close(failure); - })); + if (failed) + stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP); + }, x -> stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), Callback.NOOP))); } @Override @@ -127,15 +123,11 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements HTTP2Channel. httpResponse.version(response.getHttpVersion()).status(response.getStatus()).reason(response.getReason()); responseBegin(exchange); - if (exchange.isResponseComplete()) - return; HttpFields headers = response.getFields(); for (HttpField header : headers) { responseHeader(exchange, header); - if (exchange.isResponseComplete()) - return; } HttpRequest httpRequest = exchange.getRequest(); diff --git a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java index 3fecd5f98a6..e2d431a60c1 100644 --- a/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java +++ b/jetty-core/jetty-http3/jetty-http3-client-transport/src/main/java/org/eclipse/jetty/http3/client/transport/internal/HttpReceiverOverHTTP3.java @@ -23,6 +23,7 @@ import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.MetaData; import org.eclipse.jetty.http3.api.Stream; import org.eclipse.jetty.http3.frames.HeadersFrame; +import org.eclipse.jetty.http3.internal.HTTP3ErrorCode; import org.eclipse.jetty.io.Content; import org.eclipse.jetty.util.Promise; import org.slf4j.Logger; @@ -73,8 +74,12 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client @Override public void failAndClose(Throwable failure) { - responseFailure(failure, Promise.from(failed -> getHttpChannel().getHttpConnection().close(failure), - x -> getHttpChannel().getHttpConnection().close(failure))); + Stream stream = getHttpChannel().getStream(); + responseFailure(failure, Promise.from(failed -> + { + if (failed) + stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), failure); + }, x -> stream.reset(HTTP3ErrorCode.REQUEST_CANCELLED_ERROR.code(), failure))); } @Override @@ -95,15 +100,11 @@ public class HttpReceiverOverHTTP3 extends HttpReceiver implements Stream.Client httpResponse.version(response.getHttpVersion()).status(response.getStatus()).reason(response.getReason()); responseBegin(exchange); - if (exchange.isResponseComplete()) - return; HttpFields headers = response.getFields(); for (HttpField header : headers) { responseHeader(exchange, header); - if (exchange.isResponseComplete()) - return; } // TODO: add support for HttpMethod.CONNECT. diff --git a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java index 6c801f0b5d5..2b818069005 100644 --- a/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java +++ b/jetty-core/jetty-tests/jetty-test-client-transports/src/test/java/org/eclipse/jetty/test/client/transport/HttpClientDemandTest.java @@ -335,22 +335,22 @@ public class HttpClientDemandTest extends AbstractTest resultLatch.countDown(); }); - await().atMost(5, TimeUnit.SECONDS).until(listener1DemandRef::get, not(nullValue())); - await().atMost(5, TimeUnit.SECONDS).until(listener2DemandRef::get, not(nullValue())); - // Make both listeners progress in locksteps. int i = 0; while (resultLatch.getCount() > 0) { i++; + await().atMost(5, TimeUnit.SECONDS).until(listener1DemandRef::get, not(nullValue())); + await().atMost(5, TimeUnit.SECONDS).until(listener2DemandRef::get, not(nullValue())); + // Assert that no listener can progress for as long as both listeners did not demand. assertThat(listener1Chunks.get(), is(i)); assertThat(listener2Chunks.get(), is(i)); - listener2DemandRef.get().accept(1); + listener2DemandRef.getAndSet(null).accept(1); assertThat(listener1Chunks.get(), is(i)); assertThat(listener2Chunks.get(), is(i)); - listener1DemandRef.get().accept(1); + listener1DemandRef.getAndSet(null).accept(1); } assertTrue(resultLatch.await(5, TimeUnit.SECONDS));