diff --git a/jetty-websocket/jetty-websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java b/jetty-websocket/jetty-websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java index 5c3e3a1a8a5..e5e7eecef3b 100644 --- a/jetty-websocket/jetty-websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java +++ b/jetty-websocket/jetty-websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java @@ -164,11 +164,7 @@ public interface Session extends WebSocketPolicy, Closeable /** * Suspend the incoming read events on the connection. - *
- * This should be called during the processing of a frame or message to successfully - * suspend read events before the next frame is received. Calling suspend outside of - * this will only suspend read events after the next frame has been received. - *
+ * * @return the suspend token suitable for resuming the reading of data on the connection. */ SuspendToken suspend(); diff --git a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java index 979534e23c4..6e0382ba9ae 100644 --- a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java +++ b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java @@ -80,6 +80,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler private MessageSink activeMessageSink; private WebSocketSession session; private SuspendState state = SuspendState.DEMANDING; + private Callback suspendedCallback; public JettyWebSocketFrameHandler(WebSocketContainer container, Object endpointInstance, @@ -167,6 +168,24 @@ public class JettyWebSocketFrameHandler implements FrameHandler @Override public void onFrame(Frame frame, Callback callback) { + synchronized (this) + { + switch(state) + { + case DEMANDING: + break; + + case SUSPENDING: + suspendedCallback = Callback.from(()->onFrame(frame, callback)); + state = SuspendState.SUSPENDED; + return; + + case SUSPENDED: + default: + throw new IllegalStateException(); + } + } + // Send to raw frame handling on user side (eg: WebSocketFrameListener) if (frameHandle != null) { @@ -378,18 +397,18 @@ public class JettyWebSocketFrameHandler implements FrameHandler public void resume() { + Callback onFrame; synchronized (this) { + onFrame = suspendedCallback; + suspendedCallback = null; + switch(state) { case DEMANDING: throw new IllegalStateException("Already Resumed"); case SUSPENDED: - state = SuspendState.DEMANDING; - session.getCoreSession().demand(1); - break; - case SUSPENDING: state = SuspendState.DEMANDING; break; @@ -398,6 +417,9 @@ public class JettyWebSocketFrameHandler implements FrameHandler throw new IllegalStateException(); } } + + if (onFrame != null) + onFrame.succeeded(); } private void demand() diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java index 3b75c1b7a3f..8ffba97cc5e 100644 --- a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java @@ -154,17 +154,13 @@ public class SuspendResumeTest clientSocket.session.getRemote().sendStringByFuture("message-from-client"); assertThat(serverSocket.messages.poll(5, TimeUnit.SECONDS), is("message-from-client")); - // the first message is received as we had already demanded before suspend - serverSocket.session.getRemote().sendStringByFuture("first-message"); - assertThat(clientSocket.messages.poll(5, TimeUnit.SECONDS), is("first-message")); - - // the second message is not received as it is suspended - serverSocket.session.getRemote().sendStringByFuture("second-message"); + // the message is not received as it is suspended + serverSocket.session.getRemote().sendStringByFuture("message-from-server"); assertNull(clientSocket.messages.poll(2, TimeUnit.SECONDS)); // client should receive message after it resumes suspendToken.resume(); - assertThat(clientSocket.messages.poll(5, TimeUnit.SECONDS), is("second-message")); + assertThat(clientSocket.messages.poll(5, TimeUnit.SECONDS), is("message-from-server")); // make sure both sides are closed clientSocket.session.close();