From 8f29ea04cddd960b12144e054d5271aaf3a0148b Mon Sep 17 00:00:00 2001 From: lachan-roberts Date: Fri, 29 Mar 2019 15:01:37 +1100 Subject: [PATCH] Issue #3382 - implement Session.suspend() for jetty 10 websocket-api Signed-off-by: lachan-roberts --- .../eclipse/jetty/websocket/api/Session.java | 10 +- .../common/JettyWebSocketFrameHandler.java | 109 +++++++++-- .../websocket/common/WebSocketSession.java | 12 +- .../websocket/tests/SuspendResumeTest.java | 178 ++++++++++++++++++ 4 files changed, 291 insertions(+), 18 deletions(-) create mode 100644 jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java diff --git a/jetty-websocket/jetty-websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java b/jetty-websocket/jetty-websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java index af166e8b517..5c3e3a1a8a5 100644 --- a/jetty-websocket/jetty-websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java +++ b/jetty-websocket/jetty-websocket-api/src/main/java/org/eclipse/jetty/websocket/api/Session.java @@ -18,12 +18,12 @@ package org.eclipse.jetty.websocket.api; -import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; - import java.io.Closeable; import java.net.InetSocketAddress; import java.net.SocketAddress; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; + /** * Session represents an active link of communications with a Remote WebSocket Endpoint. */ @@ -164,7 +164,11 @@ public interface Session extends WebSocketPolicy, Closeable /** * Suspend the incoming read events on the connection. - * + *

+ * This should be called during the processing of a frame or message to successfully + * suspend read events before the next frame is received. Calling suspend outside of + * this will only suspend read events after the next frame has been received. + *

* @return the suspend token suitable for resuming the reading of data on the connection. */ SuspendToken suspend(); diff --git a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java index b9897d76a96..979534e23c4 100644 --- a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java +++ b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java @@ -45,6 +45,13 @@ import org.eclipse.jetty.websocket.core.WebSocketTimeoutException; public class JettyWebSocketFrameHandler implements FrameHandler { + private enum SuspendState + { + DEMANDING, + SUSPENDING, + SUSPENDED + } + private final Logger log; private final WebSocketContainer container; private final Object endpointInstance; @@ -72,6 +79,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler private MessageSink binarySink; private MessageSink activeMessageSink; private WebSocketSession session; + private SuspendState state = SuspendState.DEMANDING; public JettyWebSocketFrameHandler(WebSocketContainer container, Object endpointInstance, @@ -147,6 +155,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler callback.succeeded(); futureSession.complete(session); + demand(); } catch (Throwable cause) { @@ -155,11 +164,6 @@ public class JettyWebSocketFrameHandler implements FrameHandler } } - /** - * @see #onFrame(Frame,Callback) - */ - public final void onFrame(Frame frame) {} - @Override public void onFrame(Frame frame, Callback callback) { @@ -176,25 +180,34 @@ public class JettyWebSocketFrameHandler implements FrameHandler } } + // Demand after succeeding any received frame + Callback demandingCallback = Callback.from(()-> + { + callback.succeeded(); + demand(); + }, + callback::failed + ); + switch (frame.getOpCode()) { case OpCode.CLOSE: - onCloseFrame(frame, callback); + onCloseFrame(frame, demandingCallback); break; case OpCode.PING: - onPingFrame(frame, callback); + onPingFrame(frame, demandingCallback); break; case OpCode.PONG: - onPongFrame(frame, callback); + onPongFrame(frame, demandingCallback); break; case OpCode.TEXT: - onTextFrame(frame, callback); + onTextFrame(frame, demandingCallback); break; case OpCode.BINARY: - onBinaryFrame(frame, callback); + onBinaryFrame(frame, demandingCallback); break; case OpCode.CONTINUATION: - onContinuationFrame(frame, callback); + onContinuationFrame(frame, demandingCallback); break; } } @@ -337,6 +350,79 @@ public class JettyWebSocketFrameHandler implements FrameHandler acceptMessage(frame, callback); } + @Override + public boolean isDemanding() + { + return true; + } + + public void suspend() + { + synchronized (this) + { + switch(state) + { + case DEMANDING: + state = SuspendState.SUSPENDING; + break; + + case SUSPENDED: + case SUSPENDING: + throw new IllegalStateException("Already Suspended"); + + default: + throw new IllegalStateException(); + } + } + } + + public void resume() + { + synchronized (this) + { + switch(state) + { + case DEMANDING: + throw new IllegalStateException("Already Resumed"); + + case SUSPENDED: + state = SuspendState.DEMANDING; + session.getCoreSession().demand(1); + break; + + case SUSPENDING: + state = SuspendState.DEMANDING; + break; + + default: + throw new IllegalStateException(); + } + } + } + + private void demand() + { + synchronized (this) + { + switch(state) + { + case DEMANDING: + session.getCoreSession().demand(1); + break; + + case SUSPENDED: + throw new IllegalStateException("Suspended"); + + case SUSPENDING: + state = SuspendState.SUSPENDED; + break; + + default: + throw new IllegalStateException(); + } + } + } + static Throwable convertCause(Throwable cause) { if (cause instanceof MessageTooLargeException) @@ -362,5 +448,4 @@ public class JettyWebSocketFrameHandler implements FrameHandler return cause; } - } diff --git a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java index 50d937ee3ea..d6fd3406c02 100644 --- a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java +++ b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java @@ -37,7 +37,7 @@ import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.api.WebSocketBehavior; import org.eclipse.jetty.websocket.core.FrameHandler; -public class WebSocketSession extends AbstractLifeCycle implements Session, Dumpable +public class WebSocketSession extends AbstractLifeCycle implements Session, SuspendToken, Dumpable { private static final Logger LOG = Log.getLogger(WebSocketSession.class); private final FrameHandler.CoreSession coreSession; @@ -208,8 +208,14 @@ public class WebSocketSession extends AbstractLifeCycle implements Session, Dump @Override public SuspendToken suspend() { - // TODO: - return null; + frameHandler.suspend(); + return this; + } + + @Override + public void resume() + { + frameHandler.resume(); } public FrameHandler.CoreSession getCoreSession() diff --git a/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java new file mode 100644 index 00000000000..3b75c1b7a3f --- /dev/null +++ b/jetty-websocket/jetty-websocket-tests/src/test/java/org/eclipse/jetty/websocket/tests/SuspendResumeTest.java @@ -0,0 +1,178 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.tests; + +import java.net.URI; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.servlet.ServletContextHandler; +import org.eclipse.jetty.servlet.ServletHolder; +import org.eclipse.jetty.util.BlockingArrayQueue; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.Session; +import org.eclipse.jetty.websocket.api.SuspendToken; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError; +import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; +import org.eclipse.jetty.websocket.api.annotations.WebSocket; +import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.server.JettyWebSocketServlet; +import org.eclipse.jetty.websocket.server.JettyWebSocketServletContainerInitializer; +import org.eclipse.jetty.websocket.server.JettyWebSocketServletFactory; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class SuspendResumeTest +{ + @WebSocket + public static class EventSocket + { + private static final Logger LOG = Log.getLogger(EventSocket.class); + + + BlockingArrayQueue messages = new BlockingArrayQueue<>(); + CountDownLatch openLatch = new CountDownLatch(1); + CountDownLatch closeLatch = new CountDownLatch(1); + AtomicReference error = new AtomicReference<>(); + Session session; + + @OnWebSocketConnect + public void onConnect(Session session) + { + LOG.info("onConnect(): " + session); + this.session = session; + openLatch.countDown(); + } + + @OnWebSocketMessage + public void onMessage(String message) + { + LOG.info("onMessage(): " + message); + messages.offer(message); + } + + @OnWebSocketError + public void onError(Throwable t) + { + LOG.info("onError(): " + t); + error.compareAndSet(null, t); + } + + @OnWebSocketClose + public void onClose(int statusCode, String reason) + { + LOG.info("onClose(): " + statusCode + ":" + reason); + closeLatch.countDown(); + } + } + + public class UpgradeServlet extends JettyWebSocketServlet + { + @Override + public void configure(JettyWebSocketServletFactory factory) + { + factory.setCreator(((req, resp) -> serverSocket)); + } + } + + private Server server = new Server(); + private WebSocketClient client = new WebSocketClient(); + private EventSocket serverSocket = new EventSocket(); + private ServerConnector connector; + + @BeforeEach + public void start() throws Exception + { + connector = new ServerConnector(server); + connector.setPort(0); + server.addConnector(connector); + + ServletContextHandler contextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS); + contextHandler.setContextPath("/"); + server.setHandler(contextHandler); + contextHandler.addServlet(new ServletHolder(new UpgradeServlet()), "/test"); + + JettyWebSocketServletContainerInitializer.configureContext(contextHandler); + + server.start(); + client.start(); + } + + @AfterEach + public void stop() throws Exception + { + client.stop(); + server.stop(); + } + + @Test + public void testSuspendResume() throws Exception + { + URI uri = new URI("ws://localhost:"+connector.getLocalPort()+"/test"); + EventSocket clientSocket = new EventSocket(); + Future connect = client.connect(clientSocket, uri); + connect.get(5, TimeUnit.SECONDS); + + // verify connection by sending a message from server to client + assertTrue(serverSocket.openLatch.await(5, TimeUnit.SECONDS)); + serverSocket.session.getRemote().sendStringByFuture("verification"); + assertThat(clientSocket.messages.poll(5, TimeUnit.SECONDS), is("verification")); + + // suspend the client so that no read events occur + SuspendToken suspendToken = clientSocket.session.suspend(); + + // verify client can still send messages + clientSocket.session.getRemote().sendStringByFuture("message-from-client"); + assertThat(serverSocket.messages.poll(5, TimeUnit.SECONDS), is("message-from-client")); + + // the first message is received as we had already demanded before suspend + serverSocket.session.getRemote().sendStringByFuture("first-message"); + assertThat(clientSocket.messages.poll(5, TimeUnit.SECONDS), is("first-message")); + + // the second message is not received as it is suspended + serverSocket.session.getRemote().sendStringByFuture("second-message"); + assertNull(clientSocket.messages.poll(2, TimeUnit.SECONDS)); + + // client should receive message after it resumes + suspendToken.resume(); + assertThat(clientSocket.messages.poll(5, TimeUnit.SECONDS), is("second-message")); + + // make sure both sides are closed + clientSocket.session.close(); + assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS)); + assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS)); + + // check no errors occurred + assertNull(clientSocket.error.get()); + assertNull(serverSocket.error.get()); + } +}