Fixes #605 - Guard concurrent calls to WebSocketSession.close().

Introduced an AtomicBoolean to guard AbstractWebSocketConnection.close().
Made IOState code more robust with respect to synchronization.
This commit is contained in:
Simone Bordet 2016-05-31 18:28:12 +02:00
parent 2e21234328
commit 89232a6207
3 changed files with 91 additions and 71 deletions

View File

@ -103,13 +103,13 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
public void close() public void close()
{ {
/* This is assumed to always be a NORMAL closure, no reason phrase */ /* This is assumed to always be a NORMAL closure, no reason phrase */
connection.close(StatusCode.NORMAL, null); close(StatusCode.NORMAL, null);
} }
@Override @Override
public void close(CloseStatus closeStatus) public void close(CloseStatus closeStatus)
{ {
this.close(closeStatus.getCode(),closeStatus.getPhrase()); close(closeStatus.getCode(),closeStatus.getPhrase());
} }
@Override @Override
@ -149,17 +149,13 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
{ {
if(LOG.isDebugEnabled()) if(LOG.isDebugEnabled())
LOG.debug("stopping - {}",this); LOG.debug("stopping - {}",this);
try
if (getConnection() != null)
{ {
try close(StatusCode.SHUTDOWN,"Shutdown");
{ }
getConnection().close(StatusCode.SHUTDOWN,"Shutdown"); catch (Throwable t)
} {
catch (Throwable t) LOG.debug("During Connection Shutdown",t);
{
LOG.debug("During Connection Shutdown",t);
}
} }
super.doStop(); super.doStop();
} }

View File

@ -60,6 +60,8 @@ import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
*/ */
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable
{ {
private final AtomicBoolean closed = new AtomicBoolean();
private class Flusher extends FrameFlusher private class Flusher extends FrameFlusher
{ {
private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint) private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
@ -256,10 +258,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override @Override
public void close() public void close()
{ {
if(LOG_CLOSE.isDebugEnabled()) if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug(".close()"); LOG_CLOSE.debug("close()");
CloseInfo close = new CloseInfo(); close(new CloseInfo());
this.outgoingFrame(close.asFrame(),new OnCloseLocalCallback(close),BatchMode.OFF);
} }
/** /**
@ -278,9 +279,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
public void close(int statusCode, String reason) public void close(int statusCode, String reason)
{ {
if (LOG_CLOSE.isDebugEnabled()) if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("close({},{})",statusCode,reason); LOG_CLOSE.debug("close({},{})", statusCode, reason);
CloseInfo close = new CloseInfo(statusCode,reason); close(new CloseInfo(statusCode, reason));
this.outgoingFrame(close.asFrame(),new OnCloseLocalCallback(close),BatchMode.OFF); }
private void close(CloseInfo closeInfo)
{
if (closed.compareAndSet(false, true))
outgoingFrame(closeInfo.asFrame(), new OnCloseLocalCallback(closeInfo), BatchMode.OFF);
} }
@Override @Override
@ -408,7 +414,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override @Override
public boolean isOpen() public boolean isOpen()
{ {
return getIOState().isOpen() && getEndPoint().isOpen(); return !closed.get();
} }
@Override @Override

View File

@ -150,7 +150,7 @@ public class IOState
public boolean isClosed() public boolean isClosed()
{ {
synchronized (state) synchronized (this)
{ {
return (state == ConnectionState.CLOSED); return (state == ConnectionState.CLOSED);
} }
@ -163,7 +163,7 @@ public class IOState
public boolean isOpen() public boolean isOpen()
{ {
return (getConnectionState() != ConnectionState.CLOSED); return !isClosed();
} }
public boolean isOutputAvailable() public boolean isOutputAvailable()
@ -221,67 +221,87 @@ public class IOState
/** /**
* A close handshake has been issued from the local endpoint * A close handshake has been issued from the local endpoint
* @param close the close information * @param closeInfo the close information
*/ */
public void onCloseLocal(CloseInfo close) public void onCloseLocal(CloseInfo closeInfo)
{
boolean open = false;
synchronized (this)
{
ConnectionState initialState = this.state;
if (LOG.isDebugEnabled())
LOG.debug("onCloseLocal({}) : {}", closeInfo, initialState);
if (initialState == ConnectionState.CLOSED)
{
// already closed
if (LOG.isDebugEnabled())
LOG.debug("already closed");
return;
}
if (initialState == ConnectionState.CONNECTED)
{
// fast close. a local close request from end-user onConnect/onOpen method
if (LOG.isDebugEnabled())
LOG.debug("FastClose in CONNECTED detected");
open = true;
}
}
if (open)
openAndCloseLocal(closeInfo);
else
closeLocal(closeInfo);
}
private void openAndCloseLocal(CloseInfo closeInfo)
{
// Force the state open (to allow read/write to endpoint)
onOpened();
if (LOG.isDebugEnabled())
LOG.debug("FastClose continuing with Closure");
closeLocal(closeInfo);
}
private void closeLocal(CloseInfo closeInfo)
{ {
ConnectionState event = null; ConnectionState event = null;
ConnectionState abnormalEvent = null; ConnectionState abnormalEvent = null;
ConnectionState initialState = this.state;
if (LOG.isDebugEnabled())
LOG.debug("onCloseLocal({}) : {}",close,initialState);
if (initialState == ConnectionState.CLOSED)
{
// already closed
LOG.debug("already closed");
return;
}
if (initialState == ConnectionState.CONNECTED)
{
// fast close. a local close request from end-user onConnect/onOpen method
LOG.debug("FastClose in CONNECTED detected");
// Force the state open (to allow read/write to endpoint)
onOpened();
if (LOG.isDebugEnabled())
LOG.debug("FastClose continuing with Closure");
}
synchronized (this) synchronized (this)
{ {
closeInfo = close; if (LOG.isDebugEnabled())
LOG.debug("onCloseLocal(), input={}, output={}", inputAvailable, outputAvailable);
// Turn off further output
this.closeInfo = closeInfo;
// Turn off further output.
outputAvailable = false; outputAvailable = false;
boolean in = inputAvailable;
boolean out = outputAvailable;
if (closeHandshakeSource == CloseHandshakeSource.NONE) if (closeHandshakeSource == CloseHandshakeSource.NONE)
{ {
closeHandshakeSource = CloseHandshakeSource.LOCAL; closeHandshakeSource = CloseHandshakeSource.LOCAL;
} }
LOG.debug("onCloseLocal(), input={}, output={}",in,out);
if (!in && !out) if (!inputAvailable)
{ {
LOG.debug("Close Handshake satisfied, disconnecting"); if (LOG.isDebugEnabled())
LOG.debug("Close Handshake satisfied, disconnecting");
cleanClose = true; cleanClose = true;
this.state = ConnectionState.CLOSED; this.state = ConnectionState.CLOSED;
finalClose.compareAndSet(null,close); finalClose.compareAndSet(null,closeInfo);
event = this.state; event = this.state;
} }
else if (this.state == ConnectionState.OPEN) else if (this.state == ConnectionState.OPEN)
{ {
// We are now entering CLOSING (or half-closed) // We are now entering CLOSING (or half-closed).
this.state = ConnectionState.CLOSING; this.state = ConnectionState.CLOSING;
event = this.state; event = this.state;
// if abnormal, we don't expect an answer. // If abnormal, we don't expect an answer.
if (close.isAbnormal()) if (closeInfo.isAbnormal())
{ {
abnormalEvent = ConnectionState.CLOSED; abnormalEvent = ConnectionState.CLOSED;
finalClose.compareAndSet(null,close); finalClose.compareAndSet(null,closeInfo);
cleanClose = false; cleanClose = false;
outputAvailable = false; outputAvailable = false;
inputAvailable = false; inputAvailable = false;
@ -303,12 +323,12 @@ public class IOState
/** /**
* A close handshake has been received from the remote endpoint * A close handshake has been received from the remote endpoint
* @param close the close information * @param closeInfo the close information
*/ */
public void onCloseRemote(CloseInfo close) public void onCloseRemote(CloseInfo closeInfo)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onCloseRemote({})",close); LOG.debug("onCloseRemote({})", closeInfo);
ConnectionState event = null; ConnectionState event = null;
synchronized (this) synchronized (this)
{ {
@ -318,27 +338,25 @@ public class IOState
return; return;
} }
closeInfo = close; if (LOG.isDebugEnabled())
LOG.debug("onCloseRemote(), input={}, output={}", inputAvailable, outputAvailable);
this.closeInfo = closeInfo;
// turn off further input // turn off further input
inputAvailable = false; inputAvailable = false;
boolean in = inputAvailable;
boolean out = outputAvailable;
if (closeHandshakeSource == CloseHandshakeSource.NONE) if (closeHandshakeSource == CloseHandshakeSource.NONE)
{ {
closeHandshakeSource = CloseHandshakeSource.REMOTE; closeHandshakeSource = CloseHandshakeSource.REMOTE;
} }
if (LOG.isDebugEnabled()) if (!outputAvailable)
LOG.debug("onCloseRemote(), input={}, output={}",in,out);
if (!in && !out)
{ {
LOG.debug("Close Handshake satisfied, disconnecting"); LOG.debug("Close Handshake satisfied, disconnecting");
cleanClose = true; cleanClose = true;
state = ConnectionState.CLOSED; state = ConnectionState.CLOSED;
finalClose.compareAndSet(null,close); finalClose.compareAndSet(null,closeInfo);
event = this.state; event = this.state;
} }
else if (this.state == ConnectionState.OPEN) else if (this.state == ConnectionState.OPEN)