Merge pull request #3253 from lachlan-roberts/jetty-10.0.x-3167-2175-websocket-close

Issue #3167 - Refactor of WebSocketChannelState
This commit is contained in:
Greg Wilkins 2019-01-15 13:37:17 +11:00 committed by GitHub
commit a2d3a81cef
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 234 additions and 199 deletions

View File

@ -55,8 +55,6 @@ abstract public class WriteFlusher
private final EndPoint _endPoint;
private final AtomicReference<State> _state = new AtomicReference<>();
Throwable last;
static
{
// fill the state machine
@ -272,13 +270,7 @@ abstract public class WriteFlusher
LOG.debug("write: {} {}", this, BufferUtil.toDetailString(buffers));
if (!updateState(__IDLE, __WRITING))
{
if (last!=null)
last.printStackTrace();
throw new WritePendingException();
}
last = new Throwable();
try
{

View File

@ -18,6 +18,18 @@
package org.eclipse.jetty.websocket.client;
import java.io.IOException;
import java.net.CookieStore;
import java.net.SocketAddress;
import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.DecoratedObjectFactory;
@ -34,18 +46,6 @@ import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandlerFactory;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import java.io.IOException;
import java.net.CookieStore;
import java.net.SocketAddress;
import java.net.URI;
import java.time.Duration;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadLocalRandom;
public class WebSocketClient extends ContainerLifeCycle implements WebSocketPolicy
{
private final WebSocketCoreClient coreClient;
@ -93,9 +93,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
public CompletableFuture<Session> connect(Object websocket, URI toUri) throws IOException
{
ClientUpgradeRequestImpl upgradeRequest = new ClientUpgradeRequestImpl(this, coreClient, null, toUri, websocket);
coreClient.connect(upgradeRequest);
return upgradeRequest.getFutureSession();
return connect(websocket, toUri, null);
}
/**
@ -107,7 +105,7 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
* @return the future for the session, available on success of connect
* @throws IOException if unable to connect
*/
public CompletableFuture<Session> connect(Object websocket, URI toUri, org.eclipse.jetty.websocket.api.UpgradeRequest request) throws IOException
public CompletableFuture<Session> connect(Object websocket, URI toUri, UpgradeRequest request) throws IOException
{
ClientUpgradeRequestImpl upgradeRequest = new ClientUpgradeRequestImpl(this, coreClient, request, toUri, websocket);
coreClient.connect(upgradeRequest);

View File

@ -18,6 +18,15 @@
package org.eclipse.jetty.websocket.core.internal;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.Queue;
import java.util.stream.Collectors;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.DecoratedObjectFactory;
@ -34,15 +43,6 @@ import org.eclipse.jetty.websocket.core.IncomingFrames;
import org.eclipse.jetty.websocket.core.OutgoingFrames;
import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry;
import java.io.IOException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.ListIterator;
import java.util.Queue;
import java.util.stream.Collectors;
/**
* Represents the stack of Extensions.
*/

View File

@ -18,6 +18,13 @@
package org.eclipse.jetty.websocket.core.internal;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
@ -28,14 +35,6 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import java.nio.ByteBuffer;
import java.nio.channels.ClosedChannelException;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Deque;
import java.util.List;
import java.util.Objects;
public class FrameFlusher extends IteratingCallback
{
public static final Frame FLUSH_FRAME = new Frame(OpCode.BINARY);
@ -148,7 +147,7 @@ public class FrameFlusher extends IteratingCallback
}
if (LOG.isDebugEnabled())
LOG.debug("{} processed {} entries flush=%b batch=%s: {}",
LOG.debug("{} processed {} entries flush={} batch={}: {}",
this,
entries.size(),
flush,
@ -269,9 +268,8 @@ public class FrameFlusher extends IteratingCallback
@Override
public String toString()
{
return String.format("%s@%x[queueSize=%d,aggregate=%s]",
getClass().getSimpleName(),
hashCode(),
return String.format("%s[queueSize=%d,aggregate=%s]",
super.toString(),
getQueueSize(),
BufferUtil.toDetailString(batchBuffer));
}

View File

@ -18,8 +18,17 @@
package org.eclipse.jetty.websocket.core.internal;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Utf8Appendable;
import org.eclipse.jetty.util.component.Dumpable;
@ -40,16 +49,6 @@ import org.eclipse.jetty.websocket.core.WebSocketConstants;
import org.eclipse.jetty.websocket.core.WebSocketTimeoutException;
import org.eclipse.jetty.websocket.core.internal.Parser.ParsedFrame;
import java.io.IOException;
import java.net.SocketAddress;
import java.net.SocketException;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.time.Duration;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
/**
* The Core WebSocket Session.
*/
@ -59,11 +58,10 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
private final static CloseStatus NO_CODE = new CloseStatus(CloseStatus.NO_CODE);
private final Behavior behavior;
private final WebSocketChannelState state = new WebSocketChannelState();
private final WebSocketChannelState channelState = new WebSocketChannelState();
private final FrameHandler handler;
private final Negotiated negotiated;
private final boolean demanding;
private final FrameSequence outgoingSequence = new FrameSequence();
private WebSocketConnection connection;
private boolean autoFragment = WebSocketConstants.DEFAULT_AUTO_FRAGMENT;
@ -155,7 +153,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
if (frame.getOpCode() == OpCode.CLOSE)
{
if (!(frame instanceof ParsedFrame)) // already check in parser
new CloseStatus(frame.getPayload());
CloseStatus.getCloseStatus(frame);
}
}
else
@ -239,7 +237,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
@Override
public boolean isOpen()
{
return state.isOutOpen();
return channelState.isOutOpen();
}
public void setWebSocketConnection(WebSocketConnection connection)
@ -289,7 +287,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
public void onClosed(Throwable cause, CloseStatus closeStatus)
{
if (state.onClosed(closeStatus))
if (channelState.onClosed(closeStatus))
{
connection.cancelDemand();
@ -390,7 +388,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
try
{
// Upgrade success
state.onConnected();
channelState.onConnected();
if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: Transition to CONNECTED");
@ -398,7 +396,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
try
{
// Open connection and handler
state.onOpen();
channelState.onOpen();
handler.onOpen(this);
if (!demanding)
connection.demand(1);
@ -461,10 +459,11 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
if (LOG.isDebugEnabled())
LOG.debug("sendFrame({}, {}, {})", frame, callback, batch);
boolean closed;
try
{
assertValidOutgoing(frame);
outgoingSequence.check(frame.getOpCode(), frame.isFin());
closed = channelState.checkOutgoing(frame);
}
catch (Throwable ex)
{
@ -474,11 +473,10 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
if (frame.getOpCode() == OpCode.CLOSE)
{
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (LOG.isDebugEnabled())
LOG.debug("close({}, {}, {})", closeStatus, callback, batch);
LOG.debug("close({}, {}, {})", CloseStatus.getCloseStatus(frame), callback, batch);
if (state.onCloseOut(closeStatus))
if (closed)
{
callback = new Callback.Nested(callback)
{
@ -487,7 +485,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
{
try
{
handler.onClosed(state.getCloseStatus());
handler.onClosed(channelState.getCloseStatus());
}
catch (Throwable e)
{
@ -604,8 +602,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
maxTextMessageSize = maxSize;
}
private class
IncomingAdaptor extends FrameSequence implements IncomingFrames
private class IncomingAdaptor implements IncomingFrames
{
@Override
public void onFrame(Frame frame, Callback callback)
@ -614,24 +611,23 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
{
if (LOG.isDebugEnabled())
LOG.debug("receiveFrame({}, {}) - connectionState={}, handler={}",
frame, callback, state, handler);
frame, callback, channelState, handler);
boolean closed = channelState.checkIncoming(frame);
check(frame.getOpCode(), frame.isFin());
if (state.isInOpen())
{
// Handle inbound close
if (frame.getOpCode() == OpCode.CLOSE)
{
connection.cancelDemand();
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (state.onCloseIn(closeStatus))
if (closed)
{
callback = new Callback.Nested(callback)
{
@Override
public void completed()
{
handler.onClosed(state.getCloseStatus());
handler.onClosed(channelState.getCloseStatus());
connection.close();
}
};
@ -644,13 +640,14 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
@Override
public void completed()
{
// was a close sent by the handler?
if (state.isOutOpen())
if (channelState.isOutOpen())
{
// No!
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: sending close response {}", closeStatus);
// this may race with a rare application close but errors are ignored
close(closeStatus.getCode(), closeStatus.getReason(), Callback.NOOP);
return;
}
@ -661,13 +658,6 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
// Handle the frame
handler.onFrame(frame, callback);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("Discarding post EOF frame - {}", frame);
callback.failed(new EofException());
}
}
catch (Throwable t)
{
callback.failed(t);
@ -747,7 +737,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
{
return String.format("WSChannel@%x{%s,%s,af=%b,i/o=%d/%d,fs=%d}->%s",
hashCode(),
state,
channelState,
negotiated,
autoFragment,
inputBufferSize,

View File

@ -19,140 +19,197 @@
package org.eclipse.jetty.websocket.core.internal;
import org.eclipse.jetty.websocket.core.CloseStatus;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.websocket.core.Frame;
import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.core.ProtocolException;
/**
* Atomic Connection State
*/
public class WebSocketChannelState
{
private static class State
enum State
{
final String name;
final boolean inOpen;
final boolean outOpen;
final CloseStatus closeStatus;
State(String name, boolean inOpen, boolean outOpen, CloseStatus closeStatus)
{
this.name = name;
this.inOpen = inOpen;
this.outOpen = outOpen;
this.closeStatus = closeStatus;
CONNECTING,
CONNECTED,
OPEN,
ICLOSED,
OCLOSED,
CLOSED
}
@Override
public String toString()
{
return String.format("%s{i=%b o=%b c=%d}", name, inOpen, outOpen, closeStatus == null?-1:closeStatus.getCode());
}
}
private static final State CONNECTING = new State("CONNECTING", false, false, null);
private static final State CONNECTED = new State("CONNECTED", true, true, null);
private static final State OPEN = new State("OPEN", true, true, null);
private AtomicReference<State> state = new AtomicReference<>(CONNECTING);
private State _channelState = State.CONNECTING;
private byte _incomingContinuation = OpCode.UNDEFINED;
private byte _outgoingContinuation = OpCode.UNDEFINED;
CloseStatus _closeStatus = null;
public void onConnected()
{
if (!state.compareAndSet(CONNECTING, CONNECTED))
throw new IllegalStateException(state.get().toString());
synchronized (this)
{
if (_channelState != State.CONNECTING)
throw new IllegalStateException(_channelState.toString());
_channelState = State.CONNECTED;
}
}
public void onOpen()
{
if (!state.compareAndSet(CONNECTED, OPEN))
throw new IllegalStateException(state.get().toString());
synchronized (this)
{
if (_channelState != State.CONNECTED)
throw new IllegalStateException(_channelState.toString());
_channelState = State.OPEN;
}
}
@Override
public String toString()
{
return state.get().toString();
return _channelState.toString();
}
public State getState()
{
synchronized (this)
{
return _channelState;
}
}
public boolean isClosed()
{
State s = state.get();
return !s.inOpen && !s.outOpen;
return getState()==State.CLOSED;
}
public boolean isInOpen()
{
return state.get().inOpen;
State state = getState();
return (state==State.OPEN || state==State.OCLOSED);
}
public boolean isOutOpen()
{
return state.get().outOpen;
State state = getState();
return (state==State.OPEN || state==State.ICLOSED);
}
public CloseStatus getCloseStatus()
{
return state.get().closeStatus;
}
public boolean onCloseIn(CloseStatus closeStatus)
synchronized (this)
{
while (true)
{
State s = state.get();
if (!s.inOpen)
throw new IllegalStateException(state.get().toString());
if (s.outOpen)
{
State closedIn = new State("ICLOSED", false, true, closeStatus);
if (state.compareAndSet(s, closedIn))
return false;
}
else
{
State closed = new State("CLOSED", false, false, closeStatus);
if (state.compareAndSet(s, closed))
return true;
}
}
}
public boolean onCloseOut(CloseStatus closeStatus)
{
while (true)
{
State s = state.get();
if (!s.outOpen)
throw new IllegalStateException(state.get().toString());
if (s.inOpen)
{
State closedOut = new State("OCLOSED", true, false, closeStatus);
if (state.compareAndSet(s, closedOut))
return false;
}
else
{
State closed = new State("CLOSED", false, false, closeStatus);
if (state.compareAndSet(s, closed))
return true;
}
return _closeStatus;
}
}
public boolean onClosed(CloseStatus closeStatus)
{
while (true)
synchronized (this)
{
State s = state.get();
if (!s.outOpen && !s.inOpen)
if (_channelState == State.CLOSED)
return false;
State newState = new State("CLOSED", false, false, closeStatus);
if (state.compareAndSet(s, newState))
_closeStatus = closeStatus;
_channelState = State.CLOSED;
return true;
}
}
public boolean checkOutgoing(Frame frame) throws ProtocolException
{
byte opcode = frame.getOpCode();
boolean fin = frame.isFin();
synchronized (this)
{
if (!isOutOpen())
throw new IllegalStateException(_channelState.toString());
if (opcode == OpCode.CLOSE)
{
_closeStatus = CloseStatus.getCloseStatus(frame);
switch (_channelState)
{
case OPEN:
_channelState = State.OCLOSED;
return false;
case ICLOSED:
_channelState = State.CLOSED;
return true;
default:
throw new IllegalStateException(_channelState.toString());
}
}
else if (frame.isDataFrame())
{
_outgoingContinuation = checkDataSequence(opcode, fin, _outgoingContinuation);
}
}
return false;
}
public boolean checkIncoming(Frame frame) throws ProtocolException
{
byte opcode = frame.getOpCode();
boolean fin = frame.isFin();
synchronized (this)
{
if (!isInOpen())
throw new IllegalStateException(_channelState.toString());
if (opcode == OpCode.CLOSE)
{
_closeStatus = CloseStatus.getCloseStatus(frame);
switch (_channelState)
{
case OPEN:
_channelState = State.ICLOSED;
return false;
case OCLOSED:
_channelState = State.CLOSED;
return true;
default:
throw new IllegalStateException(_channelState.toString());
}
}
else if (frame.isDataFrame())
{
_incomingContinuation = checkDataSequence(opcode, fin, _incomingContinuation);
}
}
return false;
}
private static byte checkDataSequence(byte opcode, boolean fin, byte lastOpCode) throws ProtocolException
{
switch (opcode)
{
case OpCode.TEXT:
case OpCode.BINARY:
if (lastOpCode != OpCode.UNDEFINED)
throw new ProtocolException("DataFrame before fin==true");
if (!fin)
return opcode;
return OpCode.UNDEFINED;
case OpCode.CONTINUATION:
if (lastOpCode == OpCode.UNDEFINED)
throw new ProtocolException("CONTINUATION after fin==true");
if (fin)
return OpCode.UNDEFINED;
return lastOpCode;
default:
return lastOpCode;
}
}
}