406449 - Session's disconnect not detected

+ Vastly cleaned up IOState and ConnectionState behavior
This commit is contained in:
Joakim Erdfelt 2013-04-30 15:53:42 -07:00
parent b67f8204a5
commit 9ad5ab1ed5
18 changed files with 780 additions and 216 deletions

View File

@ -104,8 +104,8 @@ public class TimeoutTest
// Make sure idle timeout takes less than 5 total seconds
Assert.assertThat("Idle Timeout",dur,lessThanOrEqualTo(5000L));
// Client should see a close event, with status NO_CLOSE
wsocket.assertCloseCode(StatusCode.NORMAL);
// Client should see a close event, with status SHUTDOWN
wsocket.assertCloseCode(StatusCode.SHUTDOWN);
}
finally
{

View File

@ -20,13 +20,12 @@ package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.toolchain.test.EventQueue;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.Session;
@ -46,8 +45,8 @@ public class TrackingSocket extends WebSocketAdapter
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch dataLatch = new CountDownLatch(1);
public BlockingQueue<String> messageQueue = new BlockingArrayQueue<String>();
public BlockingQueue<Throwable> errorQueue = new BlockingArrayQueue<>();
public EventQueue<String> messageQueue = new EventQueue<>();
public EventQueue<Throwable> errorQueue = new EventQueue<>();
public void assertClose(int expectedStatusCode, String expectedReason) throws InterruptedException
{
@ -93,29 +92,9 @@ public class TrackingSocket extends WebSocketAdapter
Assert.assertThat("Was Opened",openLatch.await(500,TimeUnit.MILLISECONDS),is(true));
}
public void awaitMessage(int expectedMessageCount, TimeUnit timeoutUnit, int timeoutDuration) throws TimeoutException
public void awaitMessage(int expectedMessageCount, TimeUnit timeoutUnit, int timeoutDuration) throws TimeoutException, InterruptedException
{
long msDur = TimeUnit.MILLISECONDS.convert(timeoutDuration,timeoutUnit);
long now = System.currentTimeMillis();
long expireOn = now + msDur;
LOG.debug("Await Message.. Now: {} - expireOn: {} ({} ms)",now,expireOn,msDur);
while (messageQueue.size() < expectedMessageCount)
{
try
{
TimeUnit.MILLISECONDS.sleep(20);
}
catch (InterruptedException gnore)
{
/* ignore */
}
if (!LOG.isDebugEnabled() && (System.currentTimeMillis() > expireOn))
{
throw new TimeoutException(String.format("Timeout reading all %d expected messages. (managed to only read %d messages)",expectedMessageCount,
messageQueue.size()));
}
}
messageQueue.awaitEventCount(expectedMessageCount,timeoutDuration,timeoutUnit);
}
public void clear()

View File

@ -54,6 +54,7 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.Frame.Type;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.AcceptHash;
@ -219,6 +220,12 @@ public class BlockheadServer
}
WebSocketFrame copy = new WebSocketFrame(frame);
incomingFrames.incomingFrame(copy);
if (frame.getType() == Type.CLOSE)
{
CloseInfo close = new CloseInfo(frame);
LOG.debug("Close frame: {}",close);
}
}
@Override

View File

@ -18,13 +18,43 @@
package org.eclipse.jetty.websocket.common;
import org.eclipse.jetty.websocket.common.io.IOState;
/**
* Connection states as outlined in <a href="https://tools.ietf.org/html/rfc6455">RFC6455</a>.
*/
public enum ConnectionState
{
/** [RFC] Initial state of a connection, the upgrade request / response is in progress */
CONNECTING,
/**
* [Impl] Intermediate state between CONNECTING and OPEN, used to indicate that a upgrade request/response is successful, but the end-user provided socket's
* onOpen code has yet to run.
* <p>
* This state is to allow the local socket to initiate messages and frames, but to NOT start reading yet.
*/
CONNECTED,
/**
* [RFC] The websocket connection is established and open.
* <p>
* This indicates that the Upgrade has succeed, and the end-user provided socket's onOpen code has completed.
* <p>
* It is now time to start reading from the remote endpoint.
*/
OPEN,
/**
* [RFC] The websocket closing handshake is started.
* <p>
* This can be considered a half-closed state.
* <p>
* When receiving this as an event on {@link IOState.ConnectionStateListener#onConnectionStateChange(ConnectionState)} a close frame should be sent using
* the {@link CloseInfo} available from {@link IOState#getCloseInfo()}
*/
CLOSING,
/**
* [RFC] The websocket connection is closed.
* <p>
* Connection should be disconnected and no further reads or writes should occur.
*/
CLOSED;
}

View File

@ -48,9 +48,11 @@ import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.io.IOState;
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
@ManagedObject
public class WebSocketSession extends ContainerLifeCycle implements Session, IncomingFrames
@ManagedObject("A Jetty WebSocket Session")
public class WebSocketSession extends ContainerLifeCycle implements Session, IncomingFrames, ConnectionStateListener
{
private static final Logger LOG = Log.getLogger(WebSocketSession.class);
private final URI requestURI;
@ -80,6 +82,8 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
this.outgoingHandler = connection;
this.incomingHandler = websocket;
this.connection.getIOState().addListener(this);
// Get the parameter map (use the jetty MultiMap to do this right)
MultiMap<String> params = new MultiMap<>();
String query = requestURI.getQuery();
@ -254,6 +258,11 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
return remote.getInetSocketAddress();
}
public URI getRequestURI()
{
return requestURI;
}
@Override
public UpgradeRequest getUpgradeRequest()
{
@ -281,12 +290,11 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
@Override
public void incomingError(WebSocketException e)
{
if (connection.getIOState().isInputClosed())
if (connection.getIOState().isInputAvailable())
{
return; // input is closed
// Forward Errors to User WebSocket Object
websocket.incomingError(e);
}
// Forward Errors to User WebSocket Object
websocket.incomingError(e);
}
/**
@ -295,13 +303,11 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
@Override
public void incomingFrame(Frame frame)
{
if (connection.getIOState().isInputClosed())
if (connection.getIOState().isInputAvailable())
{
return; // input is closed
// Forward Frames Through Extension List
incomingHandler.incomingFrame(frame);
}
// Forward Frames Through Extension List
incomingHandler.incomingFrame(frame);
}
@Override
@ -332,6 +338,24 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
websocket.onClose(new CloseInfo(statusCode,reason));
}
@Override
public void onConnectionStateChange(ConnectionState state)
{
if (state == ConnectionState.CLOSED)
{
IOState ioState = this.connection.getIOState();
// The session only cares about abnormal close, as we need to notify
// the endpoint of this close scenario.
if (ioState.wasAbnormalClose())
{
CloseInfo close = ioState.getCloseInfo();
LOG.debug("Detected abnormal close: {}",close);
// notify local endpoint
notifyClose(close.getStatusCode(),close.getReason());
}
}
}
/**
* Open/Activate the session
*
@ -345,12 +369,18 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
return;
}
// Upgrade success
connection.getIOState().onConnected();
// Connect remote
remote = new WebSocketRemoteEndpoint(connection,outgoingHandler);
// Open WebSocket
websocket.openSession(this);
// Open connection
connection.getIOState().onOpened();
if (LOG.isDebugEnabled())
{
LOG.debug("open -> {}",dump());

View File

@ -103,16 +103,7 @@ public abstract class EventDriver implements IncomingFrames
onClose(close);
// process handshake
if (session.getConnection().getIOState().onCloseHandshake(true))
{
// handshake resolved, disconnect.
session.getConnection().disconnect();
}
else
{
// respond
session.close(close.getStatusCode(),close.getReason());
}
session.getConnection().getIOState().onCloseRemote(close);
return;
}

View File

@ -38,11 +38,12 @@ import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.io.FutureWriteCallback;
import org.eclipse.jetty.websocket.common.io.IOState;
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
/**
* MuxChannel, acts as WebSocketConnection for specific sub-channel.
*/
public class MuxChannel implements LogicalConnection, IncomingFrames, SuspendToken
public class MuxChannel implements LogicalConnection, IncomingFrames, SuspendToken, ConnectionStateListener
{
private static final Logger LOG = Log.getLogger(MuxChannel.class);
@ -65,7 +66,7 @@ public class MuxChannel implements LogicalConnection, IncomingFrames, SuspendTok
this.suspendToken = new AtomicBoolean(false);
this.ioState = new IOState();
ioState.setState(ConnectionState.CONNECTING);
this.ioState.addListener(this);
this.inputClosed = new AtomicBoolean(false);
this.outputClosed = new AtomicBoolean(false);
@ -88,7 +89,6 @@ public class MuxChannel implements LogicalConnection, IncomingFrames, SuspendTok
@Override
public void disconnect()
{
this.ioState.setState(ConnectionState.CLOSED);
// TODO: disconnect the virtual end-point?
}
@ -173,12 +173,18 @@ public class MuxChannel implements LogicalConnection, IncomingFrames, SuspendTok
public void onClose()
{
this.ioState.setState(ConnectionState.CLOSED);
}
@Override
public void onConnectionStateChange(ConnectionState state)
{
// TODO Auto-generated method stub
}
public void onOpen()
{
this.ioState.setState(ConnectionState.OPEN);
this.ioState.onOpened();
}
/**

View File

@ -56,11 +56,12 @@ import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
/**
* Provides the implementation of {@link WebSocketConnection} within the framework of the new {@link Connection} framework of jetty-io
*/
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, ConnectionStateListener
{
private class FlushCallback implements Callback
{
@ -141,21 +142,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
private class OnCloseCallback implements WriteCallback
{
@Override
public void writeFailed(Throwable x)
{
disconnect();
}
@Override
public void writeSuccess()
{
onWriteWebSocketClose();
}
}
public static class Stats
{
private AtomicLong countFillInterestedEvents = new AtomicLong(0);
@ -211,7 +197,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
this.extensions = new ArrayList<>();
this.suspendToken = new AtomicBoolean(false);
this.ioState = new IOState();
this.ioState.setState(ConnectionState.CONNECTING);
this.ioState.addListener(this);
this.writeBytes = new WriteBytesProvider(generator,new FlushCallback());
this.setInputBufferSize(policy.getInputBufferSize());
}
@ -247,12 +233,18 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override
public void disconnect()
{
synchronized (writeBytes)
{
if (!writeBytes.isClosed())
{
writeBytes.close();
}
}
disconnect(false);
}
public void disconnect(boolean onlyOutput)
{
ioState.setState(ConnectionState.CLOSED);
EndPoint endPoint = getEndPoint();
// We need to gently close first, to allow
// SSL close alerts to be sent by Jetty
@ -276,18 +268,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
*/
private void enqueClose(int statusCode, String reason)
{
synchronized (writeBytes)
{
// It is possible to get close events from many different sources.
// Make sure we only sent 1 over the network.
if (writeBytes.isClosed())
{
// already sent the close
return;
}
}
CloseInfo close = new CloseInfo(statusCode,reason);
outgoingFrame(close.asFrame(),new OnCloseCallback());
ioState.onCloseLocal(close);
}
protected void execute(Runnable task)
@ -438,10 +420,31 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void onClose()
{
super.onClose();
this.getIOState().setState(ConnectionState.CLOSED);
writeBytes.close();
}
@Override
public void onConnectionStateChange(ConnectionState state)
{
LOG.debug("Connection State Change: {}",state);
switch (state)
{
case OPEN:
LOG.debug("fillInterested");
fillInterested();
break;
case CLOSED:
this.disconnect();
break;
case CLOSING:
CloseInfo close = ioState.getCloseInfo();
// append close frame
outgoingFrame(close.asFrame(),null);
default:
break;
}
}
@Override
public void onFillable()
{
@ -482,18 +485,16 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void onOpen()
{
super.onOpen();
this.ioState.setState(ConnectionState.OPEN);
LOG.debug("fillInterested");
fillInterested();
this.ioState.onOpened();
}
@Override
protected boolean onReadTimeout()
{
LOG.info("Read Timeout");
LOG.debug("Read Timeout");
IOState state = getIOState();
if ((state.getState() == ConnectionState.CLOSING) || (state.getState() == ConnectionState.CLOSED))
if ((state.getConnectionState() == ConnectionState.CLOSING) || (state.getConnectionState() == ConnectionState.CLOSED))
{
// close already initiated, extra timeouts not relevant
// allow underlying connection and endpoint to disconnect on its own
@ -501,24 +502,12 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
// Initiate close - politely send close frame.
// Note: it is not possible in 100% of cases during read timeout to send this close frame.
session.incomingError(new WebSocketTimeoutException("Timeout on Read"));
session.close(StatusCode.NORMAL,"Idle Timeout");
// Force closure of writeBytes
writeBytes.close();
close(StatusCode.SHUTDOWN,"Idle Timeout");
return false;
}
public void onWriteWebSocketClose()
{
if (ioState.onCloseHandshake(false))
{
disconnect();
}
}
/**
* Frame from API, User, or Internal implementation destined for network.
*/
@ -550,6 +539,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
else if (filled < 0)
{
LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
ioState.onReadEOF();
return -1;
}
else

View File

@ -19,39 +19,79 @@
package org.eclipse.jetty.websocket.common.io;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.ConnectionState;
/**
* Simple state tracker for Input / Output and {@link ConnectionState}
* Simple state tracker for Input / Output and {@link ConnectionState}.
* <p>
* Use the various known .on*() methods to trigger a state change.
* <ul>
* <li>{@link #onOpened()} - connection has been opened</li>
* </ul>
*/
public class IOState
{
/**
* The source of a close handshake. (ie: who initiated it).
*/
private static enum CloseHandshakeSource
{
/** No close handshake initiated (yet) */
NONE,
/** Local side initiated the close handshake */
LOCAL,
/** Remote side initiated the close handshake */
REMOTE,
/** An abnormal close situation (disconnect, timeout, etc...) */
ABNORMAL;
}
public static interface ConnectionStateListener
{
public void onConnectionStateChange(ConnectionState state);
}
private static final Logger LOG = Log.getLogger(IOState.class);
private ConnectionState state;
private final AtomicBoolean inputClosed;
private final AtomicBoolean outputClosed;
private final List<ConnectionStateListener> listeners = new CopyOnWriteArrayList<>();
private final AtomicBoolean inputAvailable;
private final AtomicBoolean outputAvailable;
private final AtomicReference<CloseHandshakeSource> closeHandshakeSource;
private final AtomicReference<CloseInfo> closeInfo;
private final AtomicBoolean cleanClose;
private final AtomicBoolean remoteCloseInitiated;
private final AtomicBoolean localCloseInitiated;
/**
* Create a new IOState, initialized to {@link ConnectionState#CONNECTING}
*/
public IOState()
{
this.state = ConnectionState.CONNECTING;
this.inputClosed = new AtomicBoolean(false);
this.outputClosed = new AtomicBoolean(false);
this.remoteCloseInitiated = new AtomicBoolean(false);
this.localCloseInitiated = new AtomicBoolean(false);
this.inputAvailable = new AtomicBoolean(false);
this.outputAvailable = new AtomicBoolean(false);
this.closeHandshakeSource = new AtomicReference<>(CloseHandshakeSource.NONE);
this.closeInfo = new AtomicReference<>();
this.cleanClose = new AtomicBoolean(false);
}
public void addListener(ConnectionStateListener listener)
{
listeners.add(listener);
}
public void assertInputOpen() throws IOException
{
if (isInputClosed())
if (!isInputAvailable())
{
throw new IOException("Connection input is closed");
}
@ -59,15 +99,15 @@ public class IOState
public void assertOutputOpen() throws IOException
{
if (isOutputClosed())
if (!isOutputAvailable())
{
throw new IOException("Connection output is closed");
}
}
public boolean awaitClosed(long duration)
public CloseInfo getCloseInfo()
{
return (isInputClosed() && isOutputClosed());
return closeInfo.get();
}
public ConnectionState getConnectionState()
@ -75,90 +115,283 @@ public class IOState
return state;
}
public ConnectionState getState()
{
return state;
}
public boolean isClosed()
{
return (isInputClosed() && isOutputClosed());
synchronized (state)
{
return (state == ConnectionState.CLOSED);
}
}
public boolean isCloseInitiated()
public boolean isInputAvailable()
{
return remoteCloseInitiated.get() || localCloseInitiated.get();
}
public boolean isInputClosed()
{
return inputClosed.get();
return inputAvailable.get();
}
public boolean isOpen()
{
return (getState() != ConnectionState.CLOSED);
return (getConnectionState() != ConnectionState.CLOSED);
}
public boolean isOutputClosed()
public boolean isOutputAvailable()
{
return outputClosed.get();
return outputAvailable.get();
}
private void notifyStateListeners(ConnectionState state)
{
for (ConnectionStateListener listener : listeners)
{
listener.onConnectionStateChange(state);
}
}
/**
* Test for if connection should disconnect or response on a close handshake.
*
* @param incoming
* true if incoming close
* @param close
* the close details.
* @return true if connection should be disconnected now, or false if response to close should be issued.
* A websocket connection has been disconnected for abnormal close reasons.
* <p>
* This is the low level disconnect of the socket. It could be the result of a normal close operation, from an IO error, or even from a timeout.
*/
public boolean onCloseHandshake(boolean incoming)
public void onAbnormalClose(CloseInfo close)
{
boolean in = inputClosed.get();
boolean out = outputClosed.get();
if (incoming)
ConnectionState event = null;
synchronized (this.state)
{
in = true;
this.inputClosed.set(true);
if (!localCloseInitiated.get())
if (this.state == ConnectionState.CLOSED)
{
remoteCloseInitiated.set(true);
// already closed
return;
}
}
else
{
out = true;
this.outputClosed.set(true);
if ( !remoteCloseInitiated.get() )
if (this.state == ConnectionState.OPEN)
{
localCloseInitiated.set(true);
this.cleanClose.set(false);
}
this.state = ConnectionState.CLOSED;
this.closeInfo.compareAndSet(null,close);
this.inputAvailable.set(false);
this.outputAvailable.set(false);
this.closeHandshakeSource.set(CloseHandshakeSource.ABNORMAL);
event = this.state;
}
notifyStateListeners(event);
}
/**
* A close handshake has been issued from the local endpoint
*/
public void onCloseLocal(CloseInfo close)
{
ConnectionState event = null;
ConnectionState initialState = this.state;
if (initialState == ConnectionState.CLOSED)
{
// already closed
return;
}
if (initialState == ConnectionState.CONNECTED)
{
// fast close. a local close request from end-user onConnected() method
LOG.debug("FastClose in CONNECTED detected");
// Force the state open (to allow read/write to endpoint)
onOpened();
}
synchronized (this.state)
{
closeInfo.compareAndSet(null,close);
boolean in = inputAvailable.get();
boolean out = outputAvailable.get();
closeHandshakeSource.compareAndSet(CloseHandshakeSource.NONE,CloseHandshakeSource.LOCAL);
out = false;
outputAvailable.set(false);
LOG.debug("onCloseLocal(), input={}, output={}",in,out);
if (!in && !out)
{
LOG.debug("Close Handshake satisfied, disconnecting");
cleanClose.set(true);
this.state = ConnectionState.CLOSED;
event = this.state;
}
else if (this.state == ConnectionState.OPEN)
{
// We are now entering CLOSING (or half-closed)
this.state = ConnectionState.CLOSING;
event = this.state;
}
}
LOG.debug("onCloseHandshake({}), input={}, output={}",incoming,in,out);
if (in && out)
// Only notify on state change events
if (event != null)
{
LOG.debug("Close Handshake satisfied, disconnecting");
cleanClose.set(true);
return true;
notifyStateListeners(event);
// if SHUTDOWN, we don't expect an answer.
if (close.getStatusCode() == StatusCode.SHUTDOWN)
{
synchronized (this.state)
{
this.state = ConnectionState.CLOSED;
cleanClose.set(false);
outputAvailable.set(false);
inputAvailable.set(false);
this.closeHandshakeSource.set(CloseHandshakeSource.ABNORMAL);
event = this.state;
}
notifyStateListeners(event);
return;
}
}
}
/**
* A close handshake has been received from the remote endpoint
*/
public void onCloseRemote(CloseInfo close)
{
ConnectionState event = null;
synchronized (this.state)
{
if (this.state == ConnectionState.CLOSED)
{
// already closed
return;
}
closeInfo.compareAndSet(null,close);
boolean in = inputAvailable.get();
boolean out = outputAvailable.get();
closeHandshakeSource.compareAndSet(CloseHandshakeSource.NONE,CloseHandshakeSource.REMOTE);
in = false;
inputAvailable.set(false);
LOG.debug("onCloseRemote(), input={}, output={}",in,out);
if (!in && !out)
{
LOG.debug("Close Handshake satisfied, disconnecting");
cleanClose.set(true);
this.state = ConnectionState.CLOSED;
event = this.state;
}
else if (this.state == ConnectionState.OPEN)
{
// We are now entering CLOSING (or half-closed)
this.state = ConnectionState.CLOSING;
event = this.state;
}
}
return false;
// Only notify on state change events
if (event != null)
{
notifyStateListeners(event);
}
}
public void setConnectionState(ConnectionState connectionState)
/**
* WebSocket has successfully upgraded, but the end-user onOpen call hasn't run yet.
* <p>
* This is an intermediate state between the RFC's {@link ConnectionState#CONNECTING} and {@link ConnectionState#OPEN}
*/
public void onConnected()
{
this.state = connectionState;
if (this.state != ConnectionState.CONNECTING)
{
LOG.debug("Unable to set to connected, not in CONNECTING state: {}",this.state);
return;
}
ConnectionState event = null;
synchronized (this.state)
{
this.state = ConnectionState.CONNECTED;
this.inputAvailable.set(false); // cannot read (yet)
this.outputAvailable.set(true); // write allowed
event = this.state;
}
notifyStateListeners(event);
}
public void setState(ConnectionState state)
/**
* A websocket connection has failed its upgrade handshake, and is now closed.
*/
public void onFailedUpgrade()
{
this.state = state;
assert (this.state == ConnectionState.CONNECTING);
ConnectionState event = null;
synchronized (this.state)
{
this.state = ConnectionState.CLOSED;
this.cleanClose.set(false);
this.inputAvailable.set(false);
this.outputAvailable.set(false);
event = this.state;
}
notifyStateListeners(event);
}
/**
* A websocket connection has finished its upgrade handshake, and is now open.
*/
public void onOpened()
{
if (this.state != ConnectionState.CONNECTED)
{
LOG.debug("Unable to open, not in CONNECTED state: {}",this.state);
return;
}
assert (this.state == ConnectionState.CONNECTED);
ConnectionState event = null;
synchronized (this.state)
{
this.state = ConnectionState.OPEN;
this.inputAvailable.set(true);
this.outputAvailable.set(true);
event = this.state;
}
notifyStateListeners(event);
}
/**
* The local endpoint has reached a read EOF.
* <p>
* This could be a normal result after a proper close handshake, or even a premature close due to a connection disconnect.
*/
public void onReadEOF()
{
ConnectionState event = null;
synchronized (this.state)
{
if (this.state == ConnectionState.CLOSED)
{
// already closed
return;
}
CloseInfo close = new CloseInfo(StatusCode.NO_CLOSE,"Read EOF");
this.cleanClose.set(false);
this.state = ConnectionState.CLOSED;
this.closeInfo.compareAndSet(null,close);
this.inputAvailable.set(false);
this.outputAvailable.set(false);
this.closeHandshakeSource.set(CloseHandshakeSource.ABNORMAL);
event = this.state;
}
notifyStateListeners(event);
}
public boolean wasAbnormalClose()
{
return closeHandshakeSource.get() == CloseHandshakeSource.ABNORMAL;
}
public boolean wasCleanClose()
@ -168,11 +401,11 @@ public class IOState
public boolean wasLocalCloseInitiated()
{
return localCloseInitiated.get();
return closeHandshakeSource.get() == CloseHandshakeSource.LOCAL;
}
public boolean wasRemoteCloseInitiated()
{
return remoteCloseInitiated.get();
return closeHandshakeSource.get() == CloseHandshakeSource.REMOTE;
}
}

View File

@ -258,6 +258,10 @@ public class WriteBytesProvider implements Callback
private void notifySafeFailure(Callback callback, Throwable t)
{
if (callback == null)
{
return;
}
try
{
callback.failed(t);

View File

@ -116,7 +116,7 @@ public class EventDriverTest
driver.incomingFrame(new WebSocketFrame(OpCode.PING).setPayload("PING"));
driver.incomingFrame(WebSocketFrame.text("Text Me"));
driver.incomingFrame(WebSocketFrame.binary().setPayload("Hello Bin"));
driver.incomingFrame(new CloseInfo(StatusCode.SHUTDOWN).asFrame());
driver.incomingFrame(new CloseInfo(StatusCode.SHUTDOWN,"testcase").asFrame());
socket.capture.assertEventCount(6);
socket.capture.assertEventStartsWith(0,"onConnect(");
@ -148,13 +148,14 @@ public class EventDriverTest
}
@Test
public void testListener_Text() throws IOException
public void testListener_Text() throws Exception
{
ListenerBasicSocket socket = new ListenerBasicSocket();
EventDriver driver = wrap(socket);
try (LocalWebSocketSession conn = new LocalWebSocketSession(testname,driver))
{
conn.start();
conn.open();
driver.incomingFrame(WebSocketFrame.text("Hello World"));
driver.incomingFrame(new CloseInfo(StatusCode.NORMAL).asFrame());

View File

@ -0,0 +1,245 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.common.io;
import static org.hamcrest.Matchers.*;
import static org.junit.Assert.*;
import java.util.LinkedList;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.ConnectionState;
import org.junit.Test;
public class IOStateTest
{
public static class StateTracker implements IOState.ConnectionStateListener
{
private LinkedList<ConnectionState> transitions = new LinkedList<>();
public void assertTransitions(ConnectionState ...states)
{
assertThat("Transitions.count",transitions.size(),is(states.length));
if (states.length > 0)
{
int len = states.length;
for (int i = 0; i < len; i++)
{
assertThat("Transitions[" + i + "]",transitions.get(i),is(states[i]));
}
}
}
public LinkedList<ConnectionState> getTransitions()
{
return transitions;
}
@Override
public void onConnectionStateChange(ConnectionState state)
{
transitions.add(state);
}
}
private void assertCleanClose(IOState state, boolean expected)
{
assertThat("State.cleanClose",state.wasCleanClose(),is(expected));
}
private void assertInputAvailable(IOState state, boolean available)
{
assertThat("State.inputAvailable",state.isInputAvailable(),is(available));
}
private void assertLocalInitiated(IOState state, boolean expected)
{
assertThat("State.localCloseInitiated",state.wasLocalCloseInitiated(),is(expected));
}
private void assertOutputAvailable(IOState state, boolean available)
{
assertThat("State.outputAvailable",state.isOutputAvailable(),is(available));
}
private void assertRemoteInitiated(IOState state, boolean expected)
{
assertThat("State.remoteCloseInitiated",state.wasRemoteCloseInitiated(),is(expected));
}
private void assertState(IOState state, ConnectionState expectedState)
{
assertThat("State",state.getConnectionState(),is(expectedState));
}
@Test
public void testConnectAbnormalClose()
{
IOState state = new IOState();
StateTracker tracker = new StateTracker();
state.addListener(tracker);
assertState(state,ConnectionState.CONNECTING);
// connect
state.onConnected();
assertInputAvailable(state,false);
assertOutputAvailable(state,true);
// open
state.onOpened();
assertInputAvailable(state,true);
assertOutputAvailable(state,true);
// disconnect
state.onAbnormalClose(new CloseInfo(StatusCode.NO_CLOSE,"Oops"));
assertInputAvailable(state,false);
assertOutputAvailable(state,false);
tracker.assertTransitions(ConnectionState.CONNECTED,ConnectionState.OPEN,ConnectionState.CLOSED);
assertState(state,ConnectionState.CLOSED);
// not clean
assertCleanClose(state,false);
assertLocalInitiated(state,false);
assertRemoteInitiated(state,false);
}
@Test
public void testConnectCloseLocalInitiated()
{
IOState state = new IOState();
StateTracker tracker = new StateTracker();
state.addListener(tracker);
assertState(state,ConnectionState.CONNECTING);
// connect
state.onConnected();
assertInputAvailable(state,false);
assertOutputAvailable(state,true);
// open
state.onOpened();
assertInputAvailable(state,true);
assertOutputAvailable(state,true);
// close (local initiated)
state.onCloseLocal(new CloseInfo(StatusCode.NORMAL,"Hi"));
assertInputAvailable(state,true);
assertOutputAvailable(state,false);
assertState(state,ConnectionState.CLOSING);
// close (remote response)
state.onCloseRemote(new CloseInfo(StatusCode.NORMAL,"Hi"));
assertInputAvailable(state,false);
assertOutputAvailable(state,false);
tracker.assertTransitions(ConnectionState.CONNECTED,ConnectionState.OPEN,ConnectionState.CLOSING,ConnectionState.CLOSED);
assertState(state,ConnectionState.CLOSED);
// not clean
assertCleanClose(state,true);
assertLocalInitiated(state,true);
assertRemoteInitiated(state,false);
}
@Test
public void testConnectCloseRemoteInitiated()
{
IOState state = new IOState();
StateTracker tracker = new StateTracker();
state.addListener(tracker);
assertState(state,ConnectionState.CONNECTING);
// connect
state.onConnected();
assertInputAvailable(state,false);
assertOutputAvailable(state,true);
// open
state.onOpened();
assertInputAvailable(state,true);
assertOutputAvailable(state,true);
// close (remote initiated)
state.onCloseRemote(new CloseInfo(StatusCode.NORMAL,"Hi"));
assertInputAvailable(state,false);
assertOutputAvailable(state,true);
assertState(state,ConnectionState.CLOSING);
// close (local response)
state.onCloseLocal(new CloseInfo(StatusCode.NORMAL,"Hi"));
assertInputAvailable(state,false);
assertOutputAvailable(state,false);
tracker.assertTransitions(ConnectionState.CONNECTED,ConnectionState.OPEN,ConnectionState.CLOSING,ConnectionState.CLOSED);
assertState(state,ConnectionState.CLOSED);
// not clean
assertCleanClose(state,true);
assertLocalInitiated(state,false);
assertRemoteInitiated(state,true);
}
@Test
public void testConnectFailure()
{
IOState state = new IOState();
StateTracker tracker = new StateTracker();
state.addListener(tracker);
assertState(state,ConnectionState.CONNECTING);
// fail upgrade
state.onFailedUpgrade();
tracker.assertTransitions(ConnectionState.CLOSED);
assertState(state,ConnectionState.CLOSED);
assertInputAvailable(state,false);
assertOutputAvailable(state,false);
// not clean
assertCleanClose(state,false);
assertLocalInitiated(state,false);
assertRemoteInitiated(state,false);
}
@Test
public void testInit()
{
IOState state = new IOState();
StateTracker tracker = new StateTracker();
state.addListener(tracker);
assertState(state,ConnectionState.CONNECTING);
// do nothing
tracker.assertTransitions();
assertState(state,ConnectionState.CONNECTING);
// not connected yet
assertInputAvailable(state,false);
assertOutputAvailable(state,false);
// no close yet
assertCleanClose(state,false);
assertLocalInitiated(state,false);
assertRemoteInitiated(state,false);
}
}

View File

@ -22,22 +22,25 @@ import java.net.InetSocketAddress;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.ConnectionState;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
import org.junit.rules.TestName;
public class LocalWebSocketConnection implements LogicalConnection, IncomingFrames
public class LocalWebSocketConnection implements LogicalConnection, IncomingFrames, ConnectionStateListener
{
private static final Logger LOG = Log.getLogger(LocalWebSocketConnection.class);
private final String id;
private WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
private boolean open = false;
private IncomingFrames incoming;
private IOState ioState = new IOState();
@ -49,29 +52,33 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
public LocalWebSocketConnection(String id)
{
this.id = id;
this.ioState.addListener(this);
}
public LocalWebSocketConnection(TestName testname)
{
this.id = testname.getMethodName();
this.ioState.addListener(this);
}
@Override
public void close()
{
open = false;
close(StatusCode.NORMAL,null);
}
@Override
public void close(int statusCode, String reason)
{
open = false;
LOG.debug("close({}, {})",statusCode,reason);
CloseInfo close = new CloseInfo(statusCode,reason);
ioState.onCloseLocal(close);
}
@Override
public void disconnect()
{
open = false;
LOG.debug("disconnect()");
}
public IncomingFrames getIncoming()
@ -131,7 +138,7 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
@Override
public boolean isOpen()
{
return open;
return getIOState().isOpen();
}
@Override
@ -140,9 +147,31 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
return false;
}
@Override
public void onConnectionStateChange(ConnectionState state)
{
LOG.debug("Connection State Change: {}",state);
switch (state)
{
case CLOSED:
this.disconnect();
break;
case CLOSING:
if (ioState.wasRemoteCloseInitiated())
{
// send response close frame
CloseInfo close = ioState.getCloseInfo();
LOG.debug("write close frame: {}",close);
ioState.onCloseLocal(close);
}
default:
break;
}
}
public void onOpen() {
LOG.debug("onOpen()");
open = true;
ioState.onOpened();
}
@Override

View File

@ -75,13 +75,6 @@ public class WebSocketServerConnection extends AbstractWebSocketConnection
super.onOpen();
}
@Override
public void onWriteWebSocketClose()
{
// as server, always disconnect if writing close
disconnect();
}
@Override
public void setNextIncomingFrames(IncomingFrames incoming)
{

View File

@ -65,6 +65,7 @@ import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.extensions.ExtensionStack;
import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
import org.eclipse.jetty.websocket.common.io.IOState;
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
import org.eclipse.jetty.websocket.common.io.http.HttpResponseHeaderParser;
import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
import org.junit.Assert;
@ -81,7 +82,7 @@ import org.junit.Assert;
* with regards to basic IO behavior, a write should work as expected, a read should work as expected, but <u>what</u> byte it sends or reads is not within its
* scope.
*/
public class BlockheadClient implements IncomingFrames, OutgoingFrames
public class BlockheadClient implements IncomingFrames, OutgoingFrames, ConnectionStateListener
{
private static final String REQUEST_HASH_KEY = "dGhlIHNhbXBsZSBub25jZQ==";
private static final int BUFFER_SIZE = 8192;
@ -140,6 +141,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
this.extensionFactory = new WebSocketExtensionFactory(policy,bufferPool);
this.ioState = new IOState();
this.ioState.addListener(this);
}
public void addExtensions(String xtension)
@ -175,24 +177,22 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
public void close(int statusCode, String message)
{
try
{
CloseInfo close = new CloseInfo(statusCode,message);
CloseInfo close = new CloseInfo(statusCode,message);
if (ioState.onCloseHandshake(false))
ioState.onCloseLocal(close);
if (!ioState.isClosed())
{
WebSocketFrame frame = close.asFrame();
LOG.debug("Issuing: {}",frame);
try
{
this.disconnect();
}
else
{
WebSocketFrame frame = close.asFrame();
LOG.debug("Issuing: {}",frame);
write(frame);
}
}
catch (IOException e)
{
LOG.debug(e);
catch (IOException e)
{
LOG.debug(e);
}
}
}
@ -278,7 +278,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
// configure parser
parser.setIncomingFramesHandler(extensionStack);
ioState.setState(ConnectionState.OPEN);
ioState.onOpened();
LOG.debug("outgoing = {}",outgoing);
LOG.debug("incoming = {}",extensionStack);
@ -393,14 +393,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
if (frame.getType() == Frame.Type.CLOSE)
{
CloseInfo close = new CloseInfo(frame);
if (ioState.onCloseHandshake(true))
{
this.disconnect();
}
else
{
close(close.getStatusCode(),close.getReason());
}
ioState.onCloseRemote(close);
}
WebSocketFrame copy = new WebSocketFrame(frame);
@ -412,6 +405,27 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
return (socket != null) && (socket.isConnected());
}
@Override
public void onConnectionStateChange(ConnectionState state)
{
switch (state)
{
case CLOSED:
this.disconnect();
break;
case CLOSING:
if (ioState.wasRemoteCloseInitiated())
{
CloseInfo close = ioState.getCloseInfo();
close(close.getStatusCode(),close.getReason());
}
break;
default:
/* do nothing */
break;
}
}
@Override
public void outgoingFrame(Frame frame, WriteCallback callback)
{

View File

@ -116,6 +116,9 @@ public class BrowserDebugTool implements WebSocketCreator
// Setup the desired Socket to use for all incoming upgrade requests
factory.setCreator(BrowserDebugTool.this);
// Set the timeout
factory.getPolicy().setIdleTimeout(2000);
}
};

View File

@ -31,6 +31,15 @@ import org.eclipse.jetty.websocket.servlet.WebSocketServletFactory;
*/
public class ExampleEchoServer
{
public final class EchoSocketHandler extends WebSocketHandler
{
@Override
public void configure(WebSocketServletFactory factory)
{
factory.setCreator(new EchoCreator());
}
}
private static final Logger LOG = Log.getLogger(ExampleEchoServer.class);
public static void main(String... args)
@ -96,14 +105,7 @@ public class ExampleEchoServer
connector.setPort(port);
server.addConnector(connector);
wsHandler = new WebSocketHandler()
{
@Override
public void configure(WebSocketServletFactory factory)
{
factory.setCreator(new EchoCreator());
}
};
wsHandler = new EchoSocketHandler();
server.setHandler(wsHandler);
@ -126,6 +128,14 @@ public class ExampleEchoServer
public void runForever() throws Exception
{
server.start();
String host = connector.getHost();
if (host == null)
{
host = "localhost";
}
int port = connector.getLocalPort();
System.err.printf("Echo Server started on ws://%s:%d/%n",host,port);
System.err.println(server.dump());
server.join();
}

View File

@ -5,7 +5,6 @@ org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.websocket.LEVEL=WARN
# org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG
# org.eclipse.jetty.websocket.server.ab.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.Parser.LEVEL=DEBUG
# org.eclipse.jetty.websocket.common.Generator.LEVEL=DEBUG
# org.eclipse.jetty.websocket.server.ab.Fuzzer.LEVEL=DEBUG