From 11b60db4c36d3bf294bacfb16872588101a265dd Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 25 Sep 2019 12:13:56 +1000 Subject: [PATCH 1/2] Issue #4047 Graceful Write (#4100) Added test to reproduce issue Fixed bug from #2772 where output was shutdown on DONE without checking for END. Fixed aggregation logic to aggregate last write if aggregation already started Improved comments and clarify conditions Signed-off-by: Greg Wilkins --- .../org/eclipse/jetty/http/HttpGenerator.java | 2 +- .../eclipse/jetty/server/HttpConnection.java | 10 ++- .../org/eclipse/jetty/server/HttpOutput.java | 9 +- .../jetty/server/GracefulStopTest.java | 86 +++++++++++++++++++ .../jetty/util/component/Graceful.java | 24 +++++- 5 files changed, 121 insertions(+), 10 deletions(-) diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java index e23bb1bad04..e772d39ea67 100644 --- a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java +++ b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java @@ -78,7 +78,7 @@ public class HttpGenerator FLUSH, // The buffers previously generated should be flushed CONTINUE, // Continue generating the message SHUTDOWN_OUT, // Need EOF to be signaled - DONE // Message generation complete + DONE // The current phase of generation is complete } // other statics diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 80c31516e00..4e80aab1150 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -732,9 +732,9 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http { HttpGenerator.Result result = _generator.generateResponse(_info, _head, _header, chunk, _content, _lastContent); if (LOG.isDebugEnabled()) - LOG.debug("{} generate: {} ({},{},{})@{}", - this, + LOG.debug("generate: {} for {} ({},{},{})@{}", result, + this, BufferUtil.toSummaryString(_header), BufferUtil.toSummaryString(_content), _lastContent, @@ -826,8 +826,10 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http } case DONE: { - // If shutdown after commit, we can still close here. - if (getConnector().isShutdown()) + // If this is the end of the response and the connector was shutdown after response was committed, + // we can't add the Connection:close header, but we are still allowed to close the connection + // by shutting down the output. + if (getConnector().isShutdown() && _generator.isEnd() && _generator.isPersistent()) _shutdownOut = true; return Action.SUCCEEDED; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java index 655e8cd5dbd..3da6c658ce6 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpOutput.java @@ -591,17 +591,18 @@ public class HttpOutput extends ServletOutputStream implements Runnable // handle blocking write // Should we aggregate? - int capacity = getBufferSize(); + // Yes - if the write is smaller than the commitSize (==aggregate buffer size) + // and the write is not the last one, or is last but will fit in an already allocated aggregate buffer. boolean last = isLastContentToWrite(len); - if (!last && len <= _commitSize) + if (len <= _commitSize && (!last || len <= BufferUtil.space(_aggregate))) { acquireBuffer(); // YES - fill the aggregate with content from the buffer int filled = BufferUtil.fill(_aggregate, b, off, len); - // return if we are not complete, not full and filled all the content - if (filled == len && !BufferUtil.isFull(_aggregate)) + // return if we are not the last write and have aggregated all of the content + if (!last && filled == len && !BufferUtil.isFull(_aggregate)) return; // adjust offset/length diff --git a/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java b/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java index 3b365ac95e9..fd889fcbfbe 100644 --- a/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java +++ b/jetty-server/src/test/java/org/eclipse/jetty/server/GracefulStopTest.java @@ -56,11 +56,13 @@ import org.junit.jupiter.api.condition.DisabledOnOs; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.endsWith; import static org.hamcrest.Matchers.greaterThan; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.lessThan; import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.startsWith; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -166,6 +168,66 @@ public class GracefulStopTest client.close(); } + + /** + * Test completed writes during shutdown do not close output + * @throws Exception on test failure + */ + @Test + public void testWriteDuringShutdown() throws Exception + { + Server server = new Server(); + server.setStopTimeout(1000); + + ServerConnector connector = new ServerConnector(server); + connector.setPort(0); + server.addConnector(connector); + + ABHandler handler = new ABHandler(); + StatisticsHandler stats = new StatisticsHandler(); + server.setHandler(stats); + stats.setHandler(handler); + + server.start(); + + Thread stopper = new Thread(() -> + { + try + { + handler.latchA.await(); + server.stop(); + } + catch (Exception e) + { + e.printStackTrace(); + } + }); + stopper.start(); + + final int port = connector.getLocalPort(); + try(Socket client = new Socket("127.0.0.1", port)) + { + client.getOutputStream().write(( + "GET / HTTP/1.1\r\n" + + "Host: localhost:" + port + "\r\n" + + "\r\n" + ).getBytes()); + client.getOutputStream().flush(); + + while (!connector.isShutdown()) + Thread.sleep(10); + + handler.latchB.countDown(); + + String response = IO.toString(client.getInputStream()); + assertThat(response, startsWith("HTTP/1.1 200 ")); + assertThat(response, containsString("Content-Length: 2")); + assertThat(response, containsString("Connection: close")); + assertThat(response, endsWith("ab")); + } + stopper.join(); + } + /** * Test of standard graceful timeout mechanism when a block request does * complete. Note that even though the request completes after 100ms, the @@ -736,6 +798,30 @@ public class GracefulStopTest } } + static class ABHandler extends AbstractHandler + { + final CountDownLatch latchA = new CountDownLatch(1); + final CountDownLatch latchB = new CountDownLatch(1); + + @Override + public void handle(String s, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException + { + response.setContentLength(2); + response.getOutputStream().write("a".getBytes()); + try + { + latchA.countDown(); + latchB.await(); + } + catch (InterruptedException e) + { + throw new RuntimeException(e); + } + response.flushBuffer(); + response.getOutputStream().write("b".getBytes()); + } + } + static class TestHandler extends AbstractHandler { final CountDownLatch latch = new CountDownLatch(1); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/component/Graceful.java b/jetty-util/src/main/java/org/eclipse/jetty/util/component/Graceful.java index 541f39933f6..7408e042e17 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/component/Graceful.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/component/Graceful.java @@ -21,9 +21,25 @@ package org.eclipse.jetty.util.component; import java.util.concurrent.Future; import java.util.concurrent.atomic.AtomicReference; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FutureCallback; -/* A Lifecycle that can be gracefully shutdown. +/** + *

Jetty components that wish to be part of a Graceful shutdown implement this interface so that + * the {@link Graceful#shutdown()} method will be called to initiate a shutdown. Shutdown operations + * can fall into the following categories:

+ * + *

The {@link Future} returned by the the shutdown call will be completed to indicate the shutdown operation is completed. + * Some shutdown operations may be instantaneous and always return a completed future. + *

+ * Graceful shutdown is typically orchestrated by the doStop methods of Server or ContextHandler (for a full or partial + * shutdown respectively). + *

*/ public interface Graceful { @@ -31,6 +47,12 @@ public interface Graceful boolean isShutdown(); + /** + * A utility Graceful that uses a {@link FutureCallback} to indicate if shutdown is completed. + * By default the {@link FutureCallback} is returned as already completed, but the {@link #newShutdownCallback()} method + * can be overloaded to return a non-completed callback that will require a {@link Callback#succeeded()} or + * {@link Callback#failed(Throwable)} call to be completed. + */ class Shutdown implements Graceful { private final AtomicReference _shutdown = new AtomicReference<>(); From 3edc6c9102effadbf73e647a344e2e6df1fbfa9a Mon Sep 17 00:00:00 2001 From: Lachlan Date: Wed, 25 Sep 2019 14:55:13 +1000 Subject: [PATCH 2/2] Issue #3734 - throw ISE for WebSocket suspend after close (jetty-9.4) (#4098) * Issue #3734 - throw ISE for WebSocket suspend after close Signed-off-by: Lachlan Roberts * Issue #3734 - suspend is error if onClose() has been called Signed-off-by: Lachlan Roberts --- .../jetty/websocket/tests/SuspendResumeTest.java | 6 +++--- .../jetty/websocket/common/WebSocketSession.java | 11 ++++++----- .../eclipse/jetty/websocket/common/io/ReadState.java | 10 ++-------- .../jetty/websocket/common/io/ReadStateTest.java | 9 ++++----- 4 files changed, 15 insertions(+), 21 deletions(-) diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java index 184d57aa2c6..fbfdeca6f88 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java @@ -41,6 +41,7 @@ import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; public class SuspendResumeTest @@ -195,8 +196,7 @@ public class SuspendResumeTest assertThat(clientSocket.closeCode, is(StatusCode.NORMAL)); assertThat(serverSocket.closeCode, is(StatusCode.NORMAL)); - // suspend the client so that no read events occur - SuspendToken suspendToken = clientSocket.session.suspend(); - suspendToken.resume(); + // suspend after closed throws ISE + assertThrows(IllegalStateException.class, () -> clientSocket.session.suspend()); } } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java index 700ba25a55c..c4dc03c7a58 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java @@ -68,7 +68,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem private final EventDriver websocket; private final Executor executor; private final WebSocketPolicy policy; - private final AtomicBoolean closed = new AtomicBoolean(); + private final AtomicBoolean onCloseCalled = new AtomicBoolean(false); private ClassLoader classLoader; private ExtensionFactory extensionFactory; private RemoteEndpointFactory remoteEndpointFactory; @@ -80,7 +80,6 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem private UpgradeRequest upgradeRequest; private UpgradeResponse upgradeResponse; private CompletableFuture openFuture; - private AtomicBoolean onCloseCalled = new AtomicBoolean(false); public WebSocketSession(WebSocketContainerScope containerScope, URI requestURI, EventDriver websocket, LogicalConnection connection) { @@ -338,10 +337,9 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem public boolean isOpen() { if (this.connection == null) - { return false; - } - return !closed.get() && this.connection.isOpen(); + + return !onCloseCalled.get() && this.connection.isOpen(); } @Override @@ -546,6 +544,9 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem @Override public SuspendToken suspend() { + if (onCloseCalled.get()) + throw new IllegalStateException("Not open"); + return connection.suspend(); } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/ReadState.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/ReadState.java index e80937700d5..20752a50a14 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/ReadState.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/ReadState.java @@ -87,10 +87,8 @@ class ReadState /** * Requests that reads from the connection be suspended. - * - * @return whether the suspending was successful */ - boolean suspending() + void suspending() { synchronized (this) { @@ -101,9 +99,7 @@ class ReadState { case READING: state = State.SUSPENDING; - return true; - case EOF: - return false; + break; default: throw new IllegalStateException(toString(state)); } @@ -131,8 +127,6 @@ class ReadState ByteBuffer bb = buffer; buffer = null; return bb; - case EOF: - return null; default: throw new IllegalStateException(toString(state)); } diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/ReadStateTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/ReadStateTest.java index 5fdc72e7e77..ea27653275d 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/ReadStateTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/io/ReadStateTest.java @@ -27,7 +27,6 @@ import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.junit.jupiter.api.Assertions.assertTrue; public class ReadStateTest { @@ -50,7 +49,7 @@ public class ReadStateTest ReadState readState = new ReadState(); assertThat("Initially reading", readState.isReading(), is(true)); - assertTrue(readState.suspending()); + readState.suspending(); assertThat("Suspending doesn't take effect immediately", readState.isSuspended(), is(false)); assertNull(readState.resume()); @@ -64,7 +63,7 @@ public class ReadStateTest ReadState readState = new ReadState(); assertThat("Initially reading", readState.isReading(), is(true)); - assertThat(readState.suspending(), is(true)); + readState.suspending(); assertThat("Suspending doesn't take effect immediately", readState.isSuspended(), is(false)); ByteBuffer content = BufferUtil.toBuffer("content"); @@ -84,8 +83,8 @@ public class ReadStateTest assertThat(readState.isReading(), is(false)); assertThat(readState.isSuspended(), is(true)); - assertThat(readState.suspending(), is(false)); + assertThrows(IllegalStateException.class, readState::suspending); assertThat(readState.getAction(content), is(ReadState.Action.EOF)); - assertNull(readState.resume()); + assertThrows(IllegalStateException.class, readState::resume); } }