diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 1f787a0356c..9134588cfd9 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -1195,10 +1195,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor if (_length > 0) _combinedListener.onResponseContent(_request, _content); if (_complete && _state.completeResponse()) - { - _response.getHttpOutput().closed(); _combinedListener.onResponseEnd(_request); - } super.succeeded(); } @@ -1222,7 +1219,6 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor @Override public void failed(Throwable th) { - _response.getHttpOutput().closed(); abort(x); super.failed(x); } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index 53e302d6311..bf302759120 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -968,6 +968,9 @@ public class HttpChannelState } } + // release any aggregate buffer from a closing flush + _channel.getResponse().getHttpOutput().closed(); + if (event != null) { cancelTimeout(event); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 70fbf4868e7..f651179862a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -234,6 +234,8 @@ public class HttpOutput extends ServletOutputStream implements Runnable { write(content, complete, blocker); blocker.block(); + if (complete) + closed(); } catch (Exception failure) { @@ -403,13 +405,6 @@ public class HttpOutput extends ServletOutputStream implements Runnable State state = _state.get(); switch (state) { - case CLOSING: - { - if (!_state.compareAndSet(state, State.CLOSED)) - break; - releaseBuffer(); - return; - } case CLOSED: { return; @@ -417,7 +412,12 @@ public class HttpOutput extends ServletOutputStream implements Runnable case UNREADY: { if (_state.compareAndSet(state, State.ERROR)) - _writeListener.onError(_onError == null ? new EofException("Async closed") : _onError); + { + if (_onError == null) + _onError = new EofException("Async closed"); + releaseBuffer(); + return; + } break; } default: diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java index a14a6bcd09f..376b9f33baa 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/AsyncCompletionTest.java @@ -33,6 +33,12 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; +import javax.servlet.AsyncContext; +import javax.servlet.ServletException; +import javax.servlet.ServletOutputStream; +import javax.servlet.WriteListener; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpCompliance; import org.eclipse.jetty.http.HttpTester; @@ -41,6 +47,7 @@ import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.ManagedSelector; import org.eclipse.jetty.io.SocketChannelEndPoint; +import org.eclipse.jetty.server.handler.AbstractHandler; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.thread.Scheduler; import org.hamcrest.Matchers; @@ -89,7 +96,7 @@ public class AsyncCompletionTest extends HttpServerTestFixture _delay.get(10, TimeUnit.SECONDS); getCallback().succeeded(); } - catch(Throwable th) + catch (Throwable th) { th.printStackTrace(); getCallback().failed(th); @@ -97,7 +104,6 @@ public class AsyncCompletionTest extends HttpServerTestFixture } } - @BeforeEach public void init() throws Exception { @@ -153,7 +159,7 @@ public class AsyncCompletionTest extends HttpServerTestFixture @Override public void onCompleted() { - COMPLETE.compareAndSet(false,true); + COMPLETE.compareAndSet(false, true); super.onCompleted(); } } @@ -163,7 +169,8 @@ public class AsyncCompletionTest extends HttpServerTestFixture { List tests = new ArrayList<>(); tests.add(new Object[]{new HelloWorldHandler(), 200, "Hello world"}); - tests.add(new Object[]{new SendErrorHandler(499,"Test async sendError"), 499, "Test async sendError"}); + tests.add(new Object[]{new SendErrorHandler(499, "Test async sendError"), 499, "Test async sendError"}); + tests.add(new Object[]{new AsyncReadyCompleteHandler(), 200, AsyncReadyCompleteHandler.data}); return tests.stream().map(Arguments::of); } @@ -197,7 +204,7 @@ public class AsyncCompletionTest extends HttpServerTestFixture // wait for threads to return to base level long end = System.nanoTime() + TimeUnit.SECONDS.toNanos(10); - while(_threadPool.getBusyThreads() != base) + while (_threadPool.getBusyThreads() != base) { if (System.nanoTime() > end) throw new TimeoutException(); @@ -210,7 +217,7 @@ public class AsyncCompletionTest extends HttpServerTestFixture // proceed with the completion delay.proceed(); - while(!COMPLETE.get()) + while (!COMPLETE.get()) { if (System.nanoTime() > end) throw new TimeoutException(); @@ -218,4 +225,46 @@ public class AsyncCompletionTest extends HttpServerTestFixture } } } + + private static class AsyncReadyCompleteHandler extends AbstractHandler + { + static String data = "Now is the time for all good men to come to the aid of the party"; + + @Override + public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException + { + AsyncContext context = request.startAsync(); + ServletOutputStream out = response.getOutputStream(); + out.setWriteListener(new WriteListener() + { + byte[] bytes = data.getBytes(StandardCharsets.ISO_8859_1); + + @Override + public void onWritePossible() throws IOException + { + while (out.isReady()) + { + if (bytes != null) + { + response.setContentType("text/plain"); + response.setContentLength(bytes.length); + out.write(bytes); + bytes = null; + } + else + { + context.complete(); + return; + } + } + } + + @Override + public void onError(Throwable t) + { + t.printStackTrace(); + } + }); + } + } }