From c8b8ef6bd54401ace582ddf526cb019da9ba8065 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Thu, 11 May 2023 10:58:42 +1000 Subject: [PATCH 1/3] Issue #9682 - fix RetainableByteBuffer release bug in WebSocket Signed-off-by: Lachlan Roberts --- .../core/internal/WebSocketConnection.java | 9 +++ .../common/JettyWebSocketFrameHandler.java | 28 ++++++--- .../websocket/tests/SuspendResumeTest.java | 62 ++++++++++++++++++- 3 files changed, 86 insertions(+), 13 deletions(-) diff --git a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java index db43e1fb1f1..b822cc2a8b9 100644 --- a/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java +++ b/jetty-websocket/websocket-core-common/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketConnection.java @@ -220,6 +220,15 @@ public class WebSocketConnection extends AbstractConnection implements Connectio if (!coreSession.isClosed()) coreSession.onEof(); flusher.onClose(cause); + + try (AutoLock l = lock.lock()) + { + if (networkBuffer != null) + { + networkBuffer.clear(); + releaseNetworkBuffer(); + } + } super.onClose(cause); } diff --git a/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java index 3ea47038a48..b01abb9b5bd 100644 --- a/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java +++ b/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java @@ -81,8 +81,8 @@ public class JettyWebSocketFrameHandler implements FrameHandler private MessageSink activeMessageSink; private WebSocketSession session; private SuspendState state = SuspendState.DEMANDING; - private Runnable delayedOnFrame; - private CoreSession coreSession; + private Frame delayedFrame; + private Callback delayedCallback; public JettyWebSocketFrameHandler(WebSocketContainer container, Object endpointInstance, @@ -151,7 +151,6 @@ public class JettyWebSocketFrameHandler implements FrameHandler try { customizer.customize(coreSession); - this.coreSession = coreSession; session = new WebSocketSession(container, coreSession, this); if (!session.isOpen()) throw new IllegalStateException("Session is not open"); @@ -199,7 +198,8 @@ public class JettyWebSocketFrameHandler implements FrameHandler break; case SUSPENDING: - delayedOnFrame = () -> onFrame(frame, callback); + delayedFrame = frame; + delayedCallback = callback; state = SuspendState.SUSPENDED; return; @@ -283,12 +283,19 @@ public class JettyWebSocketFrameHandler implements FrameHandler @Override public void onClosed(CloseStatus closeStatus, Callback callback) { + Callback suspendedCallback; try (AutoLock l = lock.lock()) { // We are now closed and cannot suspend or resume. state = SuspendState.CLOSED; + delayedFrame = null; + suspendedCallback = delayedCallback; + delayedCallback = null; } + if (suspendedCallback != null) + suspendedCallback.failed(new CloseException(closeStatus.getCode(), closeStatus.getCause())); + notifyOnClose(closeStatus, callback); container.notifySessionListeners((listener) -> listener.onWebSocketSessionClosed(session)); } @@ -447,7 +454,8 @@ public class JettyWebSocketFrameHandler implements FrameHandler public void resume() { boolean needDemand = false; - Runnable delayedFrame = null; + Frame frame = null; + Callback callback = null; try (AutoLock l = lock.lock()) { switch (state) @@ -457,13 +465,13 @@ public class JettyWebSocketFrameHandler implements FrameHandler case SUSPENDED: needDemand = true; - delayedFrame = delayedOnFrame; - delayedOnFrame = null; + frame = delayedFrame; + callback = delayedCallback; state = SuspendState.DEMANDING; break; case SUSPENDING: - if (delayedOnFrame != null) + if (delayedFrame != null) throw new IllegalStateException(); state = SuspendState.DEMANDING; break; @@ -475,8 +483,8 @@ public class JettyWebSocketFrameHandler implements FrameHandler if (needDemand) { - if (delayedFrame != null) - delayedFrame.run(); + if (frame != null) + onFrame(frame, callback); else session.getCoreSession().demand(1); } diff --git a/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java b/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java index 93ba52d0762..e7fcf27a51b 100644 --- a/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java +++ b/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java @@ -15,16 +15,21 @@ package org.eclipse.jetty.websocket.tests; import java.io.IOException; import java.net.URI; +import java.time.Duration; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import org.eclipse.jetty.io.ArrayRetainableByteBufferPool; import org.eclipse.jetty.server.Server; import org.eclipse.jetty.server.ServerConnector; import org.eclipse.jetty.servlet.ServletContextHandler; import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.SuspendToken; import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.api.exceptions.WebSocketTimeoutException; import org.eclipse.jetty.websocket.client.WebSocketClient; import org.eclipse.jetty.websocket.server.JettyWebSocketServlet; import org.eclipse.jetty.websocket.server.JettyWebSocketServletFactory; @@ -34,7 +39,10 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -64,14 +72,15 @@ public class SuspendResumeTest } } - private Server server = new Server(); - private WebSocketClient client = new WebSocketClient(); - private SuspendSocket serverSocket = new SuspendSocket(); + private Server server; + private WebSocketClient client; + private SuspendSocket serverSocket; private ServerConnector connector; @BeforeEach public void start() throws Exception { + server = new Server(); connector = new ServerConnector(server); server.addConnector(connector); @@ -79,10 +88,12 @@ public class SuspendResumeTest contextHandler.setContextPath("/"); server.setHandler(contextHandler); contextHandler.addServlet(new ServletHolder(new UpgradeServlet()), "/suspend"); + serverSocket = new SuspendSocket(); JettyWebSocketServletContainerInitializer.configure(contextHandler, null); server.start(); + client = new WebSocketClient(); client.start(); } @@ -189,4 +200,49 @@ public class SuspendResumeTest // suspend after closed throws ISE assertThrows(IllegalStateException.class, () -> clientSocket.session.suspend()); } + + @Test + public void timeoutWhileSuspended() throws Exception + { + URI uri = new URI("ws://localhost:" + connector.getLocalPort() + "/suspend"); + EventSocket clientSocket = new EventSocket(); + Future connect = client.connect(clientSocket, uri); + connect.get(5, TimeUnit.SECONDS); + assertTrue(serverSocket.openLatch.await(5, TimeUnit.SECONDS)); + + // Set short idleTimeout on server. + int idleTimeout = 1000; + serverSocket.session.setIdleTimeout(Duration.ofMillis(idleTimeout)); + + // Suspend on the server. + clientSocket.session.getRemote().sendString("suspend"); + assertThat(serverSocket.textMessages.poll(5, TimeUnit.SECONDS), is("suspend")); + + // Send two messages, with batching on, so they are read into same network buffer on the server. + // First frame is read and delayed inside the JettyWebSocketFrameHandler suspendState, second frame remains in the network buffer. + clientSocket.session.getRemote().setBatchMode(BatchMode.ON); + clientSocket.session.getRemote().sendString("no demand"); + clientSocket.session.getRemote().sendString("this should sit in network buffer"); + clientSocket.session.getRemote().flush(); + assertNotNull(serverSocket.suspendToken); + + // Make sure both sides are closed. + assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + + // We received no additional messages. + assertNull(serverSocket.textMessages.poll()); + assertNull(serverSocket.binaryMessages.poll()); + + // Check the idleTimeout occurred. + assertThat(serverSocket.error, instanceOf(WebSocketTimeoutException.class)); + assertNull(clientSocket.error); + assertThat(clientSocket.closeCode, equalTo(StatusCode.SHUTDOWN)); + assertThat(clientSocket.closeReason, equalTo("Connection Idle Timeout")); + + // We should have no used buffers in the pool. + ArrayRetainableByteBufferPool pool = (ArrayRetainableByteBufferPool)connector.getByteBufferPool().asRetainableByteBufferPool(); + assertThat(pool.getHeapByteBufferCount(), equalTo(pool.getAvailableHeapByteBufferCount())); + assertThat(pool.getDirectByteBufferCount(), equalTo(pool.getAvailableDirectByteBufferCount())); + } } From 7f7551a2a88560e44de4b1aa0ac13757f0c838b7 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Fri, 12 May 2023 04:44:06 +1000 Subject: [PATCH 2/3] Issue #9682 - changes from review Signed-off-by: Lachlan Roberts --- .../jetty/websocket/common/JettyWebSocketFrameHandler.java | 2 ++ .../org/eclipse/jetty/websocket/tests/SuspendResumeTest.java | 2 +- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java index b01abb9b5bd..de40ce96107 100644 --- a/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java +++ b/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java @@ -467,6 +467,8 @@ public class JettyWebSocketFrameHandler implements FrameHandler needDemand = true; frame = delayedFrame; callback = delayedCallback; + delayedFrame = null; + delayedCallback = null; state = SuspendState.DEMANDING; break; diff --git a/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java b/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java index e7fcf27a51b..6a7f3414d92 100644 --- a/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java +++ b/jetty-websocket/websocket-jetty-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java @@ -202,7 +202,7 @@ public class SuspendResumeTest } @Test - public void timeoutWhileSuspended() throws Exception + public void testTimeoutWhileSuspended() throws Exception { URI uri = new URI("ws://localhost:" + connector.getLocalPort() + "/suspend"); EventSocket clientSocket = new EventSocket(); From c51ccd1faf4686464c7ba0b8b8d74148b8307ce6 Mon Sep 17 00:00:00 2001 From: Lachlan Roberts Date: Mon, 15 May 2023 10:12:57 +1000 Subject: [PATCH 3/3] Issue #9682 - changes after review Signed-off-by: Lachlan Roberts --- .../common/JettyWebSocketFrameHandler.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java index de40ce96107..b428de3f2c3 100644 --- a/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java +++ b/jetty-websocket/websocket-jetty-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java @@ -198,6 +198,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler break; case SUSPENDING: + assert (delayedFrame == null && delayedCallback == null); delayedFrame = frame; delayedCallback = callback; state = SuspendState.SUSPENDED; @@ -283,18 +284,18 @@ public class JettyWebSocketFrameHandler implements FrameHandler @Override public void onClosed(CloseStatus closeStatus, Callback callback) { - Callback suspendedCallback; + Callback delayedCallback; try (AutoLock l = lock.lock()) { // We are now closed and cannot suspend or resume. state = SuspendState.CLOSED; - delayedFrame = null; - suspendedCallback = delayedCallback; - delayedCallback = null; + this.delayedFrame = null; + delayedCallback = this.delayedCallback; + this.delayedCallback = null; } - if (suspendedCallback != null) - suspendedCallback.failed(new CloseException(closeStatus.getCode(), closeStatus.getCause())); + if (delayedCallback != null) + delayedCallback.failed(new CloseException(closeStatus.getCode(), closeStatus.getCause())); notifyOnClose(closeStatus, callback); container.notifySessionListeners((listener) -> listener.onWebSocketSessionClosed(session));