Issue #3167 - Refactor of WebSocketChannelState
WebSocketChannelState now performs frame sequence checks and is called on every outgoing and incoming frame, these checks return true to indicate that the WebSocketChannel is fully closed unrelated changes in ExtensionStack: succeeded() needed to be called instead of failed as explained in the comment above the change WebSocketClient: removed duplication of the connect code by calling connect again with a null UpgradeRequest FrameFlusher: improved the logging Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
parent
9bd9133399
commit
9a7e7bc999
|
@ -18,6 +18,18 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.client;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.CookieStore;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
import org.eclipse.jetty.client.HttpClient;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.util.DecoratedObjectFactory;
|
||||
|
@ -34,18 +46,6 @@ import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandlerFactory;
|
|||
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
|
||||
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.CookieStore;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadLocalRandom;
|
||||
|
||||
public class WebSocketClient extends ContainerLifeCycle implements WebSocketPolicy
|
||||
{
|
||||
private final WebSocketCoreClient coreClient;
|
||||
|
@ -93,9 +93,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
|
|||
|
||||
public CompletableFuture<Session> connect(Object websocket, URI toUri) throws IOException
|
||||
{
|
||||
ClientUpgradeRequestImpl upgradeRequest = new ClientUpgradeRequestImpl(this, coreClient, null, toUri, websocket);
|
||||
coreClient.connect(upgradeRequest);
|
||||
return upgradeRequest.getFutureSession();
|
||||
return connect(websocket, toUri, null);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -107,7 +105,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
|
|||
* @return the future for the session, available on success of connect
|
||||
* @throws IOException if unable to connect
|
||||
*/
|
||||
public CompletableFuture<Session> connect(Object websocket, URI toUri, org.eclipse.jetty.websocket.api.UpgradeRequest request) throws IOException
|
||||
public CompletableFuture<Session> connect(Object websocket, URI toUri, UpgradeRequest request) throws IOException
|
||||
{
|
||||
ClientUpgradeRequestImpl upgradeRequest = new ClientUpgradeRequestImpl(this, coreClient, request, toUri, websocket);
|
||||
coreClient.connect(upgradeRequest);
|
||||
|
|
|
@ -18,6 +18,15 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.core.internal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Queue;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.DecoratedObjectFactory;
|
||||
|
@ -34,15 +43,6 @@ import org.eclipse.jetty.websocket.core.IncomingFrames;
|
|||
import org.eclipse.jetty.websocket.core.OutgoingFrames;
|
||||
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.ListIterator;
|
||||
import java.util.Queue;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* Represents the stack of Extensions.
|
||||
*/
|
||||
|
@ -353,7 +353,7 @@ public class ExtensionStack implements IncomingFrames, OutgoingFrames, Dumpable
|
|||
// and the failure of a frame may not mean that the whole
|
||||
// connection is now invalid.
|
||||
notifyCallbackFailure(current.callback, cause);
|
||||
super.failed(cause);
|
||||
super.succeeded();
|
||||
}
|
||||
|
||||
private void notifyCallbackSuccess(Callback callback)
|
||||
|
|
|
@ -18,6 +18,13 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.core.internal;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Deque;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.util.BufferUtil;
|
||||
|
@ -28,14 +35,6 @@ import org.eclipse.jetty.util.log.Logger;
|
|||
import org.eclipse.jetty.websocket.core.Frame;
|
||||
import org.eclipse.jetty.websocket.core.OpCode;
|
||||
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.channels.ClosedChannelException;
|
||||
import java.util.ArrayDeque;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Deque;
|
||||
import java.util.List;
|
||||
import java.util.Objects;
|
||||
|
||||
public class FrameFlusher extends IteratingCallback
|
||||
{
|
||||
public static final Frame FLUSH_FRAME = new Frame(OpCode.BINARY);
|
||||
|
@ -148,7 +147,7 @@ public class FrameFlusher extends IteratingCallback
|
|||
}
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("{} processed {} entries flush=%b batch=%s: {}",
|
||||
LOG.debug("{} processed {} entries flush={} batch={}: {}",
|
||||
this,
|
||||
entries.size(),
|
||||
flush,
|
||||
|
@ -269,9 +268,8 @@ public class FrameFlusher extends IteratingCallback
|
|||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s@%x[queueSize=%d,aggregate=%s]",
|
||||
getClass().getSimpleName(),
|
||||
hashCode(),
|
||||
return String.format("%s[queueSize=%d,aggregate=%s]",
|
||||
super.toString(),
|
||||
getQueueSize(),
|
||||
BufferUtil.toDetailString(batchBuffer));
|
||||
}
|
||||
|
|
|
@ -18,8 +18,17 @@
|
|||
|
||||
package org.eclipse.jetty.websocket.core.internal;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.SocketException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
import org.eclipse.jetty.io.EofException;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Utf8Appendable;
|
||||
import org.eclipse.jetty.util.component.Dumpable;
|
||||
|
@ -40,16 +49,6 @@ import org.eclipse.jetty.websocket.core.WebSocketConstants;
|
|||
import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
|
||||
import org.eclipse.jetty.websocket.core.internal.Parser.ParsedFrame;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.SocketAddress;
|
||||
import java.net.SocketException;
|
||||
import java.net.SocketTimeoutException;
|
||||
import java.net.URI;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
/**
|
||||
* The Core WebSocket Session.
|
||||
*/
|
||||
|
@ -59,11 +58,10 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
|
|||
private final static CloseStatus NO_CODE = new CloseStatus(CloseStatus.NO_CODE);
|
||||
|
||||
private final Behavior behavior;
|
||||
private final WebSocketChannelState state = new WebSocketChannelState();
|
||||
private final WebSocketChannelState channelState = new WebSocketChannelState();
|
||||
private final FrameHandler handler;
|
||||
private final Negotiated negotiated;
|
||||
private final boolean demanding;
|
||||
private final FrameSequence outgoingSequence = new FrameSequence();
|
||||
|
||||
private WebSocketConnection connection;
|
||||
private boolean autoFragment = WebSocketConstants.DEFAULT_AUTO_FRAGMENT;
|
||||
|
@ -239,7 +237,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
|
|||
@Override
|
||||
public boolean isOpen()
|
||||
{
|
||||
return state.isOutOpen();
|
||||
return channelState.isOutOpen();
|
||||
}
|
||||
|
||||
public void setWebSocketConnection(WebSocketConnection connection)
|
||||
|
@ -289,7 +287,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
|
|||
|
||||
public void onClosed(Throwable cause, CloseStatus closeStatus)
|
||||
{
|
||||
if (state.onClosed(closeStatus))
|
||||
if (channelState.onClosed(closeStatus))
|
||||
{
|
||||
connection.cancelDemand();
|
||||
|
||||
|
@ -390,7 +388,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
|
|||
try
|
||||
{
|
||||
// Upgrade success
|
||||
state.onConnected();
|
||||
channelState.onConnected();
|
||||
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("ConnectionState: Transition to CONNECTED");
|
||||
|
@ -398,7 +396,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
|
|||
try
|
||||
{
|
||||
// Open connection and handler
|
||||
state.onOpen();
|
||||
channelState.onOpen();
|
||||
handler.onOpen(this);
|
||||
if (!demanding)
|
||||
connection.demand(1);
|
||||
|
@ -459,10 +457,11 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("sendFrame({}, {}, {})", frame, callback, batch);
|
||||
|
||||
boolean closed;
|
||||
try
|
||||
{
|
||||
assertValidOutgoing(frame);
|
||||
outgoingSequence.check(frame.getOpCode(), frame.isFin());
|
||||
closed = channelState.checkOutgoing(frame);
|
||||
}
|
||||
catch (Throwable ex)
|
||||
{
|
||||
|
@ -476,7 +475,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
|
|||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("close({}, {}, {})", closeStatus, callback, batch);
|
||||
|
||||
if (state.onCloseOut(closeStatus))
|
||||
if (closed)
|
||||
{
|
||||
callback = new Callback.Nested(callback)
|
||||
{
|
||||
|
@ -485,7 +484,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
|
|||
{
|
||||
try
|
||||
{
|
||||
handler.onClosed(state.getCloseStatus());
|
||||
handler.onClosed(channelState.getCloseStatus());
|
||||
}
|
||||
catch (Throwable e)
|
||||
{
|
||||
|
@ -598,7 +597,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
|
|||
maxTextMessageSize = maxSize;
|
||||
}
|
||||
|
||||
private class IncomingAdaptor extends FrameSequence implements IncomingFrames
|
||||
private class IncomingAdaptor implements IncomingFrames
|
||||
{
|
||||
@Override
|
||||
public void onFrame(Frame frame, Callback callback)
|
||||
|
@ -607,24 +606,24 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
|
|||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("receiveFrame({}, {}) - connectionState={}, handler={}",
|
||||
frame, callback, state, handler);
|
||||
frame, callback, channelState, handler);
|
||||
|
||||
|
||||
boolean closed = channelState.checkIncoming(frame);
|
||||
|
||||
check(frame.getOpCode(), frame.isFin());
|
||||
if (state.isInOpen())
|
||||
{
|
||||
// Handle inbound close
|
||||
if (frame.getOpCode() == OpCode.CLOSE)
|
||||
{
|
||||
connection.cancelDemand();
|
||||
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
|
||||
if (state.onCloseIn(closeStatus))
|
||||
if (closed)
|
||||
{
|
||||
callback = new Callback.Nested(callback)
|
||||
{
|
||||
@Override
|
||||
public void completed()
|
||||
{
|
||||
handler.onClosed(state.getCloseStatus());
|
||||
handler.onClosed(channelState.getCloseStatus());
|
||||
connection.close();
|
||||
}
|
||||
};
|
||||
|
@ -637,8 +636,8 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
|
|||
@Override
|
||||
public void completed()
|
||||
{
|
||||
// was a close sent by the handler?
|
||||
if (state.isOutOpen())
|
||||
// was a close sent by the handler? todo what if someone beats this to close
|
||||
if (channelState.isOutOpen())
|
||||
{
|
||||
// No!
|
||||
if (LOG.isDebugEnabled())
|
||||
|
@ -654,13 +653,6 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
|
|||
// Handle the frame
|
||||
handler.onFrame(frame, callback);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Discarding post EOF frame - {}", frame);
|
||||
callback.failed(new EofException());
|
||||
}
|
||||
}
|
||||
catch (Throwable t)
|
||||
{
|
||||
callback.failed(t);
|
||||
|
@ -740,7 +732,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
|
|||
{
|
||||
return String.format("WSChannel@%x{%s,%s,af=%b,i/o=%d/%d,fs=%d}->%s",
|
||||
hashCode(),
|
||||
state,
|
||||
channelState,
|
||||
negotiated,
|
||||
autoFragment,
|
||||
inputBufferSize,
|
||||
|
|
|
@ -19,140 +19,223 @@
|
|||
package org.eclipse.jetty.websocket.core.internal;
|
||||
|
||||
import org.eclipse.jetty.websocket.core.CloseStatus;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import org.eclipse.jetty.websocket.core.Frame;
|
||||
import org.eclipse.jetty.websocket.core.OpCode;
|
||||
import org.eclipse.jetty.websocket.core.ProtocolException;
|
||||
|
||||
/**
|
||||
* Atomic Connection State
|
||||
*/
|
||||
public class WebSocketChannelState
|
||||
{
|
||||
private static class State
|
||||
enum State
|
||||
{
|
||||
final String name;
|
||||
final boolean inOpen;
|
||||
final boolean outOpen;
|
||||
final CloseStatus closeStatus;
|
||||
|
||||
State(String name, boolean inOpen, boolean outOpen, CloseStatus closeStatus)
|
||||
{
|
||||
this.name = name;
|
||||
this.inOpen = inOpen;
|
||||
this.outOpen = outOpen;
|
||||
this.closeStatus = closeStatus;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return String.format("%s{i=%b o=%b c=%d}", name, inOpen, outOpen, closeStatus == null?-1:closeStatus.getCode());
|
||||
}
|
||||
}
|
||||
|
||||
private static final State CONNECTING = new State("CONNECTING", false, false, null);
|
||||
private static final State CONNECTED = new State("CONNECTED", true, true, null);
|
||||
private static final State OPEN = new State("OPEN", true, true, null);
|
||||
|
||||
private AtomicReference<State> state = new AtomicReference<>(CONNECTING);
|
||||
|
||||
public void onConnected()
|
||||
{
|
||||
if (!state.compareAndSet(CONNECTING, CONNECTED))
|
||||
throw new IllegalStateException(state.get().toString());
|
||||
}
|
||||
|
||||
public void onOpen()
|
||||
{
|
||||
if (!state.compareAndSet(CONNECTED, OPEN))
|
||||
throw new IllegalStateException(state.get().toString());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return state.get().toString();
|
||||
}
|
||||
CONNECTING,
|
||||
CONNECTED,
|
||||
OPEN,
|
||||
ICLOSED,
|
||||
OCLOSED,
|
||||
CLOSED;
|
||||
|
||||
public boolean isClosed()
|
||||
{
|
||||
State s = state.get();
|
||||
return !s.inOpen && !s.outOpen;
|
||||
return this.equals(CLOSED);
|
||||
}
|
||||
|
||||
public boolean isInOpen()
|
||||
{
|
||||
return state.get().inOpen;
|
||||
if (this.equals(OPEN) || this.equals(OCLOSED))
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
public boolean isOutOpen()
|
||||
{
|
||||
return state.get().outOpen;
|
||||
if (this.equals(OPEN) || this.equals(ICLOSED))
|
||||
return true;
|
||||
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
private State _channelState = State.CONNECTING;
|
||||
private byte _incomingSequence = OpCode.UNDEFINED;
|
||||
private byte _outgoingSequence = OpCode.UNDEFINED;
|
||||
CloseStatus _closeStatus = null;
|
||||
|
||||
|
||||
public void onConnected()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
if (!_channelState.equals(State.CONNECTING))
|
||||
throw new IllegalStateException(_channelState.toString());
|
||||
|
||||
_channelState = State.CONNECTED;
|
||||
}
|
||||
}
|
||||
|
||||
public void onOpen()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
if (!_channelState.equals(State.CONNECTED))
|
||||
throw new IllegalStateException(_channelState.toString());
|
||||
|
||||
_channelState = State.OPEN;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString()
|
||||
{
|
||||
return _channelState.toString();
|
||||
}
|
||||
|
||||
|
||||
public State getState()
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
return _channelState;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean isClosed()
|
||||
{
|
||||
return getState().isClosed();
|
||||
}
|
||||
|
||||
public boolean isInOpen()
|
||||
{
|
||||
return getState().isInOpen();
|
||||
}
|
||||
|
||||
public boolean isOutOpen()
|
||||
{
|
||||
return getState().isOutOpen();
|
||||
}
|
||||
|
||||
public CloseStatus getCloseStatus()
|
||||
{
|
||||
return state.get().closeStatus;
|
||||
}
|
||||
|
||||
public boolean onCloseIn(CloseStatus closeStatus)
|
||||
synchronized (this)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
State s = state.get();
|
||||
|
||||
if (!s.inOpen)
|
||||
throw new IllegalStateException(state.get().toString());
|
||||
|
||||
if (s.outOpen)
|
||||
{
|
||||
State closedIn = new State("ICLOSED", false, true, closeStatus);
|
||||
if (state.compareAndSet(s, closedIn))
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
State closed = new State("CLOSED", false, false, closeStatus);
|
||||
if (state.compareAndSet(s, closed))
|
||||
return true;
|
||||
}
|
||||
return _closeStatus;
|
||||
}
|
||||
}
|
||||
|
||||
public boolean onCloseOut(CloseStatus closeStatus)
|
||||
{
|
||||
while (true)
|
||||
{
|
||||
State s = state.get();
|
||||
|
||||
if (!s.outOpen)
|
||||
throw new IllegalStateException(state.get().toString());
|
||||
|
||||
if (s.inOpen)
|
||||
{
|
||||
State closedOut = new State("OCLOSED", true, false, closeStatus);
|
||||
if (state.compareAndSet(s, closedOut))
|
||||
return false;
|
||||
}
|
||||
else
|
||||
{
|
||||
State closed = new State("CLOSED", false, false, closeStatus);
|
||||
if (state.compareAndSet(s, closed))
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean onClosed(CloseStatus closeStatus)
|
||||
{
|
||||
while (true)
|
||||
synchronized (this)
|
||||
{
|
||||
State s = state.get();
|
||||
if (!s.outOpen && !s.inOpen)
|
||||
if (_channelState.equals(State.CLOSED))
|
||||
return false;
|
||||
|
||||
State newState = new State("CLOSED", false, false, closeStatus);
|
||||
if (state.compareAndSet(s, newState))
|
||||
_closeStatus = closeStatus;
|
||||
_channelState = State.CLOSED;
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public boolean checkOutgoing(Frame frame) throws ProtocolException
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
_outgoingSequence = getNextState(frame, _outgoingSequence);
|
||||
|
||||
if (frame.getOpCode() == OpCode.CLOSE)
|
||||
{
|
||||
_closeStatus = CloseStatus.getCloseStatus(frame);
|
||||
_channelState = updateChannelState(_channelState, _incomingSequence, _outgoingSequence);
|
||||
}
|
||||
|
||||
return _channelState.equals(State.CLOSED);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
public boolean checkIncoming(Frame frame) throws ProtocolException
|
||||
{
|
||||
synchronized (this)
|
||||
{
|
||||
_incomingSequence = getNextState(frame, _incomingSequence);
|
||||
|
||||
if (frame.getOpCode() == OpCode.CLOSE)
|
||||
{
|
||||
_closeStatus = CloseStatus.getCloseStatus(frame);
|
||||
_channelState = updateChannelState(_channelState, _incomingSequence, _outgoingSequence);
|
||||
}
|
||||
|
||||
return _channelState.equals(State.CLOSED);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
private static State updateChannelState(State state, byte _incomingSequence, byte _outgoingSequence)
|
||||
{
|
||||
switch (state)
|
||||
{
|
||||
case OPEN:
|
||||
case ICLOSED:
|
||||
case OCLOSED:
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException(state.toString());
|
||||
}
|
||||
|
||||
State newState = state;
|
||||
if ((_outgoingSequence == OpCode.CLOSE) && (_incomingSequence == OpCode.CLOSE))
|
||||
newState = State.CLOSED;
|
||||
else if (_outgoingSequence == OpCode.CLOSE)
|
||||
newState = State.OCLOSED;
|
||||
else if (_incomingSequence == OpCode.CLOSE)
|
||||
newState = State.ICLOSED;
|
||||
|
||||
return newState;
|
||||
}
|
||||
|
||||
|
||||
public static byte getNextState(Frame frame, byte state) throws ProtocolException
|
||||
{
|
||||
byte opcode = frame.getOpCode();
|
||||
boolean fin = frame.isFin();
|
||||
|
||||
if (state == OpCode.CLOSE)
|
||||
throw new ProtocolException(OpCode.name(opcode) + " after CLOSE");
|
||||
|
||||
switch (opcode)
|
||||
{
|
||||
case OpCode.UNDEFINED:
|
||||
throw new ProtocolException("UNDEFINED OpCode: " + OpCode.name(opcode));
|
||||
|
||||
case OpCode.CONTINUATION:
|
||||
if (state == OpCode.UNDEFINED)
|
||||
throw new ProtocolException("CONTINUATION after fin==true");
|
||||
if (fin)
|
||||
return OpCode.UNDEFINED;
|
||||
return state;
|
||||
|
||||
case OpCode.CLOSE:
|
||||
return OpCode.CLOSE;
|
||||
|
||||
case OpCode.PING:
|
||||
case OpCode.PONG:
|
||||
return state;
|
||||
|
||||
case OpCode.TEXT:
|
||||
case OpCode.BINARY:
|
||||
default:
|
||||
if (state != OpCode.UNDEFINED)
|
||||
throw new ProtocolException("DataFrame before fin==true");
|
||||
if (!fin)
|
||||
return opcode;
|
||||
|
||||
return OpCode.UNDEFINED;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue