Issue #3382 - implement Session.suspend() for jetty 10 websocket-api

Signed-off-by: lachan-roberts <lachlan@webtide.com>
This commit is contained in:
lachan-roberts 2019-03-29 15:01:37 +11:00
parent 717d7300ac
commit 8f29ea04cd
4 changed files with 291 additions and 18 deletions

View File

@ -18,12 +18,12 @@
package org.eclipse.jetty.websocket.api; package org.eclipse.jetty.websocket.api;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import java.io.Closeable; import java.io.Closeable;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketAddress; import java.net.SocketAddress;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
/** /**
* Session represents an active link of communications with a Remote WebSocket Endpoint. * Session represents an active link of communications with a Remote WebSocket Endpoint.
*/ */
@ -164,7 +164,11 @@ public interface Session extends WebSocketPolicy, Closeable
/** /**
* Suspend the incoming read events on the connection. * Suspend the incoming read events on the connection.
* * <p>
* This should be called during the processing of a frame or message to successfully
* suspend read events before the next frame is received. Calling suspend outside of
* this will only suspend read events after the next frame has been received.
* </p>
* @return the suspend token suitable for resuming the reading of data on the connection. * @return the suspend token suitable for resuming the reading of data on the connection.
*/ */
SuspendToken suspend(); SuspendToken suspend();

View File

@ -45,6 +45,13 @@ import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
public class JettyWebSocketFrameHandler implements FrameHandler public class JettyWebSocketFrameHandler implements FrameHandler
{ {
private enum SuspendState
{
DEMANDING,
SUSPENDING,
SUSPENDED
}
private final Logger log; private final Logger log;
private final WebSocketContainer container; private final WebSocketContainer container;
private final Object endpointInstance; private final Object endpointInstance;
@ -72,6 +79,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
private MessageSink binarySink; private MessageSink binarySink;
private MessageSink activeMessageSink; private MessageSink activeMessageSink;
private WebSocketSession session; private WebSocketSession session;
private SuspendState state = SuspendState.DEMANDING;
public JettyWebSocketFrameHandler(WebSocketContainer container, public JettyWebSocketFrameHandler(WebSocketContainer container,
Object endpointInstance, Object endpointInstance,
@ -147,6 +155,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
callback.succeeded(); callback.succeeded();
futureSession.complete(session); futureSession.complete(session);
demand();
} }
catch (Throwable cause) catch (Throwable cause)
{ {
@ -155,11 +164,6 @@ public class JettyWebSocketFrameHandler implements FrameHandler
} }
} }
/**
* @see #onFrame(Frame,Callback)
*/
public final void onFrame(Frame frame) {}
@Override @Override
public void onFrame(Frame frame, Callback callback) public void onFrame(Frame frame, Callback callback)
{ {
@ -176,25 +180,34 @@ public class JettyWebSocketFrameHandler implements FrameHandler
} }
} }
// Demand after succeeding any received frame
Callback demandingCallback = Callback.from(()->
{
callback.succeeded();
demand();
},
callback::failed
);
switch (frame.getOpCode()) switch (frame.getOpCode())
{ {
case OpCode.CLOSE: case OpCode.CLOSE:
onCloseFrame(frame, callback); onCloseFrame(frame, demandingCallback);
break; break;
case OpCode.PING: case OpCode.PING:
onPingFrame(frame, callback); onPingFrame(frame, demandingCallback);
break; break;
case OpCode.PONG: case OpCode.PONG:
onPongFrame(frame, callback); onPongFrame(frame, demandingCallback);
break; break;
case OpCode.TEXT: case OpCode.TEXT:
onTextFrame(frame, callback); onTextFrame(frame, demandingCallback);
break; break;
case OpCode.BINARY: case OpCode.BINARY:
onBinaryFrame(frame, callback); onBinaryFrame(frame, demandingCallback);
break; break;
case OpCode.CONTINUATION: case OpCode.CONTINUATION:
onContinuationFrame(frame, callback); onContinuationFrame(frame, demandingCallback);
break; break;
} }
} }
@ -337,6 +350,79 @@ public class JettyWebSocketFrameHandler implements FrameHandler
acceptMessage(frame, callback); acceptMessage(frame, callback);
} }
@Override
public boolean isDemanding()
{
return true;
}
public void suspend()
{
synchronized (this)
{
switch(state)
{
case DEMANDING:
state = SuspendState.SUSPENDING;
break;
case SUSPENDED:
case SUSPENDING:
throw new IllegalStateException("Already Suspended");
default:
throw new IllegalStateException();
}
}
}
public void resume()
{
synchronized (this)
{
switch(state)
{
case DEMANDING:
throw new IllegalStateException("Already Resumed");
case SUSPENDED:
state = SuspendState.DEMANDING;
session.getCoreSession().demand(1);
break;
case SUSPENDING:
state = SuspendState.DEMANDING;
break;
default:
throw new IllegalStateException();
}
}
}
private void demand()
{
synchronized (this)
{
switch(state)
{
case DEMANDING:
session.getCoreSession().demand(1);
break;
case SUSPENDED:
throw new IllegalStateException("Suspended");
case SUSPENDING:
state = SuspendState.SUSPENDED;
break;
default:
throw new IllegalStateException();
}
}
}
static Throwable convertCause(Throwable cause) static Throwable convertCause(Throwable cause)
{ {
if (cause instanceof MessageTooLargeException) if (cause instanceof MessageTooLargeException)
@ -362,5 +448,4 @@ public class JettyWebSocketFrameHandler implements FrameHandler
return cause; return cause;
} }
} }

View File

@ -37,7 +37,7 @@ import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketBehavior; import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.core.FrameHandler; import org.eclipse.jetty.websocket.core.FrameHandler;
public class WebSocketSession extends AbstractLifeCycle implements Session, Dumpable public class WebSocketSession extends AbstractLifeCycle implements Session, SuspendToken, Dumpable
{ {
private static final Logger LOG = Log.getLogger(WebSocketSession.class); private static final Logger LOG = Log.getLogger(WebSocketSession.class);
private final FrameHandler.CoreSession coreSession; private final FrameHandler.CoreSession coreSession;
@ -208,8 +208,14 @@ public class WebSocketSession extends AbstractLifeCycle implements Session, Dump
@Override @Override
public SuspendToken suspend() public SuspendToken suspend()
{ {
// TODO: frameHandler.suspend();
return null; return this;
}
@Override
public void resume()
{
frameHandler.resume();
} }
public FrameHandler.CoreSession getCoreSession() public FrameHandler.CoreSession getCoreSession()

View File

@ -0,0 +1,178 @@
//
// ========================================================================
// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.tests;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.ServerConnector;
import org.eclipse.jetty.servlet.ServletContextHandler;
import org.eclipse.jetty.servlet.ServletHolder;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketError;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.server.JettyWebSocketServlet;
import org.eclipse.jetty.websocket.server.JettyWebSocketServletContainerInitializer;
import org.eclipse.jetty.websocket.server.JettyWebSocketServletFactory;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class SuspendResumeTest
{
@WebSocket
public static class EventSocket
{
private static final Logger LOG = Log.getLogger(EventSocket.class);
BlockingArrayQueue<String> messages = new BlockingArrayQueue<>();
CountDownLatch openLatch = new CountDownLatch(1);
CountDownLatch closeLatch = new CountDownLatch(1);
AtomicReference<Throwable> error = new AtomicReference<>();
Session session;
@OnWebSocketConnect
public void onConnect(Session session)
{
LOG.info("onConnect(): " + session);
this.session = session;
openLatch.countDown();
}
@OnWebSocketMessage
public void onMessage(String message)
{
LOG.info("onMessage(): " + message);
messages.offer(message);
}
@OnWebSocketError
public void onError(Throwable t)
{
LOG.info("onError(): " + t);
error.compareAndSet(null, t);
}
@OnWebSocketClose
public void onClose(int statusCode, String reason)
{
LOG.info("onClose(): " + statusCode + ":" + reason);
closeLatch.countDown();
}
}
public class UpgradeServlet extends JettyWebSocketServlet
{
@Override
public void configure(JettyWebSocketServletFactory factory)
{
factory.setCreator(((req, resp) -> serverSocket));
}
}
private Server server = new Server();
private WebSocketClient client = new WebSocketClient();
private EventSocket serverSocket = new EventSocket();
private ServerConnector connector;
@BeforeEach
public void start() throws Exception
{
connector = new ServerConnector(server);
connector.setPort(0);
server.addConnector(connector);
ServletContextHandler contextHandler = new ServletContextHandler(ServletContextHandler.SESSIONS);
contextHandler.setContextPath("/");
server.setHandler(contextHandler);
contextHandler.addServlet(new ServletHolder(new UpgradeServlet()), "/test");
JettyWebSocketServletContainerInitializer.configureContext(contextHandler);
server.start();
client.start();
}
@AfterEach
public void stop() throws Exception
{
client.stop();
server.stop();
}
@Test
public void testSuspendResume() throws Exception
{
URI uri = new URI("ws://localhost:"+connector.getLocalPort()+"/test");
EventSocket clientSocket = new EventSocket();
Future<Session> connect = client.connect(clientSocket, uri);
connect.get(5, TimeUnit.SECONDS);
// verify connection by sending a message from server to client
assertTrue(serverSocket.openLatch.await(5, TimeUnit.SECONDS));
serverSocket.session.getRemote().sendStringByFuture("verification");
assertThat(clientSocket.messages.poll(5, TimeUnit.SECONDS), is("verification"));
// suspend the client so that no read events occur
SuspendToken suspendToken = clientSocket.session.suspend();
// verify client can still send messages
clientSocket.session.getRemote().sendStringByFuture("message-from-client");
assertThat(serverSocket.messages.poll(5, TimeUnit.SECONDS), is("message-from-client"));
// the first message is received as we had already demanded before suspend
serverSocket.session.getRemote().sendStringByFuture("first-message");
assertThat(clientSocket.messages.poll(5, TimeUnit.SECONDS), is("first-message"));
// the second message is not received as it is suspended
serverSocket.session.getRemote().sendStringByFuture("second-message");
assertNull(clientSocket.messages.poll(2, TimeUnit.SECONDS));
// client should receive message after it resumes
suspendToken.resume();
assertThat(clientSocket.messages.poll(5, TimeUnit.SECONDS), is("second-message"));
// make sure both sides are closed
clientSocket.session.close();
assertTrue(clientSocket.closeLatch.await(5, TimeUnit.SECONDS));
assertTrue(serverSocket.closeLatch.await(5, TimeUnit.SECONDS));
// check no errors occurred
assertNull(clientSocket.error.get());
assertNull(serverSocket.error.get());
}
}