Issue #3382 - delay any frames received while suspended until resumed

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2019-04-03 17:24:40 +11:00
parent 8f29ea04cd
commit 5eed464730
3 changed files with 30 additions and 16 deletions

View File

@ -164,11 +164,7 @@ public interface Session extends WebSocketPolicy, Closeable
/** /**
* Suspend the incoming read events on the connection. * Suspend the incoming read events on the connection.
* <p> *
* This should be called during the processing of a frame or message to successfully
* suspend read events before the next frame is received. Calling suspend outside of
* this will only suspend read events after the next frame has been received.
* </p>
* @return the suspend token suitable for resuming the reading of data on the connection. * @return the suspend token suitable for resuming the reading of data on the connection.
*/ */
SuspendToken suspend(); SuspendToken suspend();

View File

@ -80,6 +80,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
private MessageSink activeMessageSink; private MessageSink activeMessageSink;
private WebSocketSession session; private WebSocketSession session;
private SuspendState state = SuspendState.DEMANDING; private SuspendState state = SuspendState.DEMANDING;
private Callback suspendedCallback;
public JettyWebSocketFrameHandler(WebSocketContainer container, public JettyWebSocketFrameHandler(WebSocketContainer container,
Object endpointInstance, Object endpointInstance,
@ -167,6 +168,24 @@ public class JettyWebSocketFrameHandler implements FrameHandler
@Override @Override
public void onFrame(Frame frame, Callback callback) public void onFrame(Frame frame, Callback callback)
{ {
synchronized (this)
{
switch(state)
{
case DEMANDING:
break;
case SUSPENDING:
suspendedCallback = Callback.from(()->onFrame(frame, callback));
state = SuspendState.SUSPENDED;
return;
case SUSPENDED:
default:
throw new IllegalStateException();
}
}
// Send to raw frame handling on user side (eg: WebSocketFrameListener) // Send to raw frame handling on user side (eg: WebSocketFrameListener)
if (frameHandle != null) if (frameHandle != null)
{ {
@ -378,18 +397,18 @@ public class JettyWebSocketFrameHandler implements FrameHandler
public void resume() public void resume()
{ {
Callback onFrame;
synchronized (this) synchronized (this)
{ {
onFrame = suspendedCallback;
suspendedCallback = null;
switch(state) switch(state)
{ {
case DEMANDING: case DEMANDING:
throw new IllegalStateException("Already Resumed"); throw new IllegalStateException("Already Resumed");
case SUSPENDED: case SUSPENDED:
state = SuspendState.DEMANDING;
session.getCoreSession().demand(1);
break;
case SUSPENDING: case SUSPENDING:
state = SuspendState.DEMANDING; state = SuspendState.DEMANDING;
break; break;
@ -398,6 +417,9 @@ public class JettyWebSocketFrameHandler implements FrameHandler
throw new IllegalStateException(); throw new IllegalStateException();
} }
} }
if (onFrame != null)
onFrame.succeeded();
} }
private void demand() private void demand()

View File

@ -154,17 +154,13 @@ public class SuspendResumeTest
clientSocket.session.getRemote().sendStringByFuture("message-from-client"); clientSocket.session.getRemote().sendStringByFuture("message-from-client");
assertThat(serverSocket.messages.poll(5, TimeUnit.SECONDS), is("message-from-client")); assertThat(serverSocket.messages.poll(5, TimeUnit.SECONDS), is("message-from-client"));
// the first message is received as we had already demanded before suspend // the message is not received as it is suspended
serverSocket.session.getRemote().sendStringByFuture("first-message"); serverSocket.session.getRemote().sendStringByFuture("message-from-server");
assertThat(clientSocket.messages.poll(5, TimeUnit.SECONDS), is("first-message"));
// the second message is not received as it is suspended
serverSocket.session.getRemote().sendStringByFuture("second-message");
assertNull(clientSocket.messages.poll(2, TimeUnit.SECONDS)); assertNull(clientSocket.messages.poll(2, TimeUnit.SECONDS));
// client should receive message after it resumes // client should receive message after it resumes
suspendToken.resume(); suspendToken.resume();
assertThat(clientSocket.messages.poll(5, TimeUnit.SECONDS), is("second-message")); assertThat(clientSocket.messages.poll(5, TimeUnit.SECONDS), is("message-from-server"));
// make sure both sides are closed // make sure both sides are closed
clientSocket.session.close(); clientSocket.session.close();