Issue #3382 - changes from review
Signed-off-by: lachan-roberts <lachlan@webtide.com>
This commit is contained in:
parent
5eed464730
commit
3f0ed2cc1a
|
@ -164,7 +164,11 @@ public interface Session extends WebSocketPolicy, Closeable
|
|||
|
||||
/**
|
||||
* Suspend the incoming read events on the connection.
|
||||
*
|
||||
* <p>
|
||||
* This should be called during the processing of a frame or message to atomically
|
||||
* suspend read events before the next frame is received. Calling suspend outside of
|
||||
* this can result in a frame being received shortly after the suspend call.
|
||||
* </p>
|
||||
* @return the suspend token suitable for resuming the reading of data on the connection.
|
||||
*/
|
||||
SuspendToken suspend();
|
||||
|
|
|
@ -80,7 +80,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
|
|||
private MessageSink activeMessageSink;
|
||||
private WebSocketSession session;
|
||||
private SuspendState state = SuspendState.DEMANDING;
|
||||
private Callback suspendedCallback;
|
||||
private Runnable delayedOnFrame;
|
||||
|
||||
public JettyWebSocketFrameHandler(WebSocketContainer container,
|
||||
Object endpointInstance,
|
||||
|
@ -176,7 +176,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
|
|||
break;
|
||||
|
||||
case SUSPENDING:
|
||||
suspendedCallback = Callback.from(()->onFrame(frame, callback));
|
||||
delayedOnFrame = ()->onFrame(frame, callback);
|
||||
state = SuspendState.SUSPENDED;
|
||||
return;
|
||||
|
||||
|
@ -397,19 +397,23 @@ public class JettyWebSocketFrameHandler implements FrameHandler
|
|||
|
||||
public void resume()
|
||||
{
|
||||
Callback onFrame;
|
||||
Runnable delayedFrame = null;
|
||||
synchronized (this)
|
||||
{
|
||||
onFrame = suspendedCallback;
|
||||
suspendedCallback = null;
|
||||
|
||||
switch(state)
|
||||
{
|
||||
case DEMANDING:
|
||||
throw new IllegalStateException("Already Resumed");
|
||||
|
||||
case SUSPENDED:
|
||||
delayedFrame = delayedOnFrame;
|
||||
delayedOnFrame = null;
|
||||
state = SuspendState.DEMANDING;
|
||||
break;
|
||||
|
||||
case SUSPENDING:
|
||||
if (delayedOnFrame != null)
|
||||
throw new IllegalStateException();
|
||||
state = SuspendState.DEMANDING;
|
||||
break;
|
||||
|
||||
|
@ -418,8 +422,8 @@ public class JettyWebSocketFrameHandler implements FrameHandler
|
|||
}
|
||||
}
|
||||
|
||||
if (onFrame != null)
|
||||
onFrame.succeeded();
|
||||
if (delayedFrame != null)
|
||||
delayedFrame.run();
|
||||
}
|
||||
|
||||
private void demand()
|
||||
|
|
Loading…
Reference in New Issue