Issue #3167 - simplification of WebSocketChannelState from review

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2019-01-15 11:58:39 +11:00
parent 1e4ac07511
commit 63a1db39a0
2 changed files with 94 additions and 121 deletions

View File

@ -153,7 +153,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
if (frame.getOpCode() == OpCode.CLOSE)
{
if (!(frame instanceof ParsedFrame)) // already check in parser
new CloseStatus(frame.getPayload());
CloseStatus.getCloseStatus(frame);
}
}
else
@ -473,9 +473,8 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
if (frame.getOpCode() == OpCode.CLOSE)
{
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (LOG.isDebugEnabled())
LOG.debug("close({}, {}, {})", closeStatus, callback, batch);
LOG.debug("close({}, {}, {})", CloseStatus.getCloseStatus(frame), callback, batch);
if (closed)
{
@ -621,7 +620,6 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
if (frame.getOpCode() == OpCode.CLOSE)
{
connection.cancelDemand();
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (closed)
{
callback = new Callback.Nested(callback)
@ -642,13 +640,14 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
@Override
public void completed()
{
// was a close sent by the handler? todo what if someone beats this to close
if (channelState.isOutOpen())
{
// No!
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: sending close response {}", closeStatus);
// this may race with a rare application close but errors are ignored
close(closeStatus.getCode(), closeStatus.getReason(), Callback.NOOP);
return;
}

View File

@ -35,41 +35,19 @@ public class WebSocketChannelState
OPEN,
ICLOSED,
OCLOSED,
CLOSED;
public boolean isClosed()
{
return this.equals(CLOSED);
}
public boolean isInOpen()
{
if (this.equals(OPEN) || this.equals(OCLOSED))
return true;
return false;
}
public boolean isOutOpen()
{
if (this.equals(OPEN) || this.equals(ICLOSED))
return true;
return false;
}
CLOSED
}
private State _channelState = State.CONNECTING;
private byte _incomingSequence = OpCode.UNDEFINED;
private byte _outgoingSequence = OpCode.UNDEFINED;
private byte _incomingContinuation = OpCode.UNDEFINED;
private byte _outgoingContinuation = OpCode.UNDEFINED;
CloseStatus _closeStatus = null;
public void onConnected()
{
synchronized (this)
{
if (!_channelState.equals(State.CONNECTING))
if (_channelState != State.CONNECTING)
throw new IllegalStateException(_channelState.toString());
_channelState = State.CONNECTED;
@ -80,7 +58,7 @@ public class WebSocketChannelState
{
synchronized (this)
{
if (!_channelState.equals(State.CONNECTED))
if (_channelState != State.CONNECTED)
throw new IllegalStateException(_channelState.toString());
_channelState = State.OPEN;
@ -104,17 +82,19 @@ public class WebSocketChannelState
public boolean isClosed()
{
return getState().isClosed();
return getState()==State.CLOSED;
}
public boolean isInOpen()
{
return getState().isInOpen();
State state = getState();
return (state==State.OPEN || state==State.OCLOSED);
}
public boolean isOutOpen()
{
return getState().isOutOpen();
State state = getState();
return (state==State.OPEN || state==State.ICLOSED);
}
public CloseStatus getCloseStatus()
@ -125,12 +105,11 @@ public class WebSocketChannelState
}
}
public boolean onClosed(CloseStatus closeStatus)
{
synchronized (this)
{
if (_channelState.equals(State.CLOSED))
if (_channelState == State.CLOSED)
return false;
_closeStatus = closeStatus;
@ -139,103 +118,98 @@ public class WebSocketChannelState
}
}
public boolean checkOutgoing(Frame frame) throws ProtocolException
{
synchronized (this)
{
_outgoingSequence = getNextState(frame, _outgoingSequence);
if (frame.getOpCode() == OpCode.CLOSE)
{
_closeStatus = CloseStatus.getCloseStatus(frame);
_channelState = updateChannelState(_channelState, _incomingSequence, _outgoingSequence);
}
return _channelState.equals(State.CLOSED);
}
}
public boolean checkIncoming(Frame frame) throws ProtocolException
{
synchronized (this)
{
_incomingSequence = getNextState(frame, _incomingSequence);
if (frame.getOpCode() == OpCode.CLOSE)
{
_closeStatus = CloseStatus.getCloseStatus(frame);
_channelState = updateChannelState(_channelState, _incomingSequence, _outgoingSequence);
}
return _channelState.equals(State.CLOSED);
}
}
private static State updateChannelState(State state, byte _incomingSequence, byte _outgoingSequence)
{
switch (state)
{
case OPEN:
case ICLOSED:
case OCLOSED:
break;
default:
throw new IllegalStateException(state.toString());
}
State newState = state;
if ((_outgoingSequence == OpCode.CLOSE) && (_incomingSequence == OpCode.CLOSE))
newState = State.CLOSED;
else if (_outgoingSequence == OpCode.CLOSE)
newState = State.OCLOSED;
else if (_incomingSequence == OpCode.CLOSE)
newState = State.ICLOSED;
return newState;
}
public static byte getNextState(Frame frame, byte state) throws ProtocolException
{
byte opcode = frame.getOpCode();
boolean fin = frame.isFin();
if (state == OpCode.CLOSE)
throw new ProtocolException(OpCode.name(opcode) + " after CLOSE");
synchronized (this)
{
if (!isOutOpen())
throw new IllegalStateException(_channelState.toString());
if (opcode == OpCode.CLOSE)
{
_closeStatus = CloseStatus.getCloseStatus(frame);
switch (_channelState)
{
case OPEN:
_channelState = State.OCLOSED;
return false;
case ICLOSED:
_channelState = State.CLOSED;
return true;
default:
throw new IllegalStateException(_channelState.toString());
}
}
else if (frame.isDataFrame())
{
_outgoingContinuation = checkDataSequence(opcode, fin, _outgoingContinuation);
}
}
return false;
}
public boolean checkIncoming(Frame frame) throws ProtocolException
{
byte opcode = frame.getOpCode();
boolean fin = frame.isFin();
synchronized (this)
{
if (!isInOpen())
throw new IllegalStateException(_channelState.toString());
if (opcode == OpCode.CLOSE)
{
_closeStatus = CloseStatus.getCloseStatus(frame);
switch (_channelState)
{
case OPEN:
_channelState = State.ICLOSED;
return false;
case OCLOSED:
_channelState = State.CLOSED;
return true;
default:
throw new IllegalStateException(_channelState.toString());
}
}
else if (frame.isDataFrame())
{
_incomingContinuation = checkDataSequence(opcode, fin, _incomingContinuation);
}
}
return false;
}
private static byte checkDataSequence(byte opcode, boolean fin, byte lastOpCode) throws ProtocolException
{
switch (opcode)
{
case OpCode.UNDEFINED:
throw new ProtocolException("UNDEFINED OpCode: " + OpCode.name(opcode));
case OpCode.CONTINUATION:
if (state == OpCode.UNDEFINED)
throw new ProtocolException("CONTINUATION after fin==true");
if (fin)
return OpCode.UNDEFINED;
return state;
case OpCode.CLOSE:
return OpCode.CLOSE;
case OpCode.PING:
case OpCode.PONG:
return state;
case OpCode.TEXT:
case OpCode.BINARY:
default:
if (state != OpCode.UNDEFINED)
if (lastOpCode != OpCode.UNDEFINED)
throw new ProtocolException("DataFrame before fin==true");
if (!fin)
return opcode;
return OpCode.UNDEFINED;
case OpCode.CONTINUATION:
if (lastOpCode == OpCode.UNDEFINED)
throw new ProtocolException("CONTINUATION after fin==true");
if (fin)
return OpCode.UNDEFINED;
return lastOpCode;
default:
return lastOpCode;
}
}
}