From 89232a62072e7c550f31fbc1dd711dc5f2b6fede Mon Sep 17 00:00:00 2001 From: Simone Bordet Date: Tue, 31 May 2016 18:28:12 +0200 Subject: [PATCH] Fixes #605 - Guard concurrent calls to WebSocketSession.close(). Introduced an AtomicBoolean to guard AbstractWebSocketConnection.close(). Made IOState code more robust with respect to synchronization. --- .../websocket/common/WebSocketSession.java | 20 ++- .../io/AbstractWebSocketConnection.java | 22 ++-- .../jetty/websocket/common/io/IOState.java | 120 ++++++++++-------- 3 files changed, 91 insertions(+), 71 deletions(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java index 88d41c5bb8e..944f86dbe34 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java @@ -103,13 +103,13 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web public void close() { /* This is assumed to always be a NORMAL closure, no reason phrase */ - connection.close(StatusCode.NORMAL, null); + close(StatusCode.NORMAL, null); } @Override public void close(CloseStatus closeStatus) { - this.close(closeStatus.getCode(),closeStatus.getPhrase()); + close(closeStatus.getCode(),closeStatus.getPhrase()); } @Override @@ -149,17 +149,13 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web { if(LOG.isDebugEnabled()) LOG.debug("stopping - {}",this); - - if (getConnection() != null) + try { - try - { - getConnection().close(StatusCode.SHUTDOWN,"Shutdown"); - } - catch (Throwable t) - { - LOG.debug("During Connection Shutdown",t); - } + close(StatusCode.SHUTDOWN,"Shutdown"); + } + catch (Throwable t) + { + LOG.debug("During Connection Shutdown",t); } super.doStop(); } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index 562f734a73f..5a4d309c9c5 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -60,6 +60,8 @@ import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener; */ public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable { + private final AtomicBoolean closed = new AtomicBoolean(); + private class Flusher extends FrameFlusher { private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint) @@ -256,10 +258,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp @Override public void close() { - if(LOG_CLOSE.isDebugEnabled()) - LOG_CLOSE.debug(".close()"); - CloseInfo close = new CloseInfo(); - this.outgoingFrame(close.asFrame(),new OnCloseLocalCallback(close),BatchMode.OFF); + if (LOG_CLOSE.isDebugEnabled()) + LOG_CLOSE.debug("close()"); + close(new CloseInfo()); } /** @@ -278,9 +279,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp public void close(int statusCode, String reason) { if (LOG_CLOSE.isDebugEnabled()) - LOG_CLOSE.debug("close({},{})",statusCode,reason); - CloseInfo close = new CloseInfo(statusCode,reason); - this.outgoingFrame(close.asFrame(),new OnCloseLocalCallback(close),BatchMode.OFF); + LOG_CLOSE.debug("close({},{})", statusCode, reason); + close(new CloseInfo(statusCode, reason)); + } + + private void close(CloseInfo closeInfo) + { + if (closed.compareAndSet(false, true)) + outgoingFrame(closeInfo.asFrame(), new OnCloseLocalCallback(closeInfo), BatchMode.OFF); } @Override @@ -408,7 +414,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp @Override public boolean isOpen() { - return getIOState().isOpen() && getEndPoint().isOpen(); + return !closed.get(); } @Override diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java index 0c8c2599ef5..5640bcc3897 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java @@ -150,7 +150,7 @@ public class IOState public boolean isClosed() { - synchronized (state) + synchronized (this) { return (state == ConnectionState.CLOSED); } @@ -163,7 +163,7 @@ public class IOState public boolean isOpen() { - return (getConnectionState() != ConnectionState.CLOSED); + return !isClosed(); } public boolean isOutputAvailable() @@ -221,67 +221,87 @@ public class IOState /** * A close handshake has been issued from the local endpoint - * @param close the close information + * @param closeInfo the close information */ - public void onCloseLocal(CloseInfo close) + public void onCloseLocal(CloseInfo closeInfo) + { + boolean open = false; + synchronized (this) + { + ConnectionState initialState = this.state; + if (LOG.isDebugEnabled()) + LOG.debug("onCloseLocal({}) : {}", closeInfo, initialState); + if (initialState == ConnectionState.CLOSED) + { + // already closed + if (LOG.isDebugEnabled()) + LOG.debug("already closed"); + return; + } + + if (initialState == ConnectionState.CONNECTED) + { + // fast close. a local close request from end-user onConnect/onOpen method + if (LOG.isDebugEnabled()) + LOG.debug("FastClose in CONNECTED detected"); + open = true; + } + } + + if (open) + openAndCloseLocal(closeInfo); + else + closeLocal(closeInfo); + } + + private void openAndCloseLocal(CloseInfo closeInfo) + { + // Force the state open (to allow read/write to endpoint) + onOpened(); + if (LOG.isDebugEnabled()) + LOG.debug("FastClose continuing with Closure"); + closeLocal(closeInfo); + } + + private void closeLocal(CloseInfo closeInfo) { ConnectionState event = null; ConnectionState abnormalEvent = null; - ConnectionState initialState = this.state; - if (LOG.isDebugEnabled()) - LOG.debug("onCloseLocal({}) : {}",close,initialState); - if (initialState == ConnectionState.CLOSED) - { - // already closed - LOG.debug("already closed"); - return; - } - - if (initialState == ConnectionState.CONNECTED) - { - // fast close. a local close request from end-user onConnect/onOpen method - LOG.debug("FastClose in CONNECTED detected"); - // Force the state open (to allow read/write to endpoint) - onOpened(); - if (LOG.isDebugEnabled()) - LOG.debug("FastClose continuing with Closure"); - } - synchronized (this) { - closeInfo = close; - - // Turn off further output + if (LOG.isDebugEnabled()) + LOG.debug("onCloseLocal(), input={}, output={}", inputAvailable, outputAvailable); + + this.closeInfo = closeInfo; + + // Turn off further output. outputAvailable = false; - boolean in = inputAvailable; - boolean out = outputAvailable; if (closeHandshakeSource == CloseHandshakeSource.NONE) { closeHandshakeSource = CloseHandshakeSource.LOCAL; } - - LOG.debug("onCloseLocal(), input={}, output={}",in,out); - if (!in && !out) + if (!inputAvailable) { - LOG.debug("Close Handshake satisfied, disconnecting"); + if (LOG.isDebugEnabled()) + LOG.debug("Close Handshake satisfied, disconnecting"); cleanClose = true; this.state = ConnectionState.CLOSED; - finalClose.compareAndSet(null,close); + finalClose.compareAndSet(null,closeInfo); event = this.state; } else if (this.state == ConnectionState.OPEN) { - // We are now entering CLOSING (or half-closed) + // We are now entering CLOSING (or half-closed). this.state = ConnectionState.CLOSING; event = this.state; - - // if abnormal, we don't expect an answer. - if (close.isAbnormal()) + + // If abnormal, we don't expect an answer. + if (closeInfo.isAbnormal()) { abnormalEvent = ConnectionState.CLOSED; - finalClose.compareAndSet(null,close); + finalClose.compareAndSet(null,closeInfo); cleanClose = false; outputAvailable = false; inputAvailable = false; @@ -303,12 +323,12 @@ public class IOState /** * A close handshake has been received from the remote endpoint - * @param close the close information + * @param closeInfo the close information */ - public void onCloseRemote(CloseInfo close) + public void onCloseRemote(CloseInfo closeInfo) { if (LOG.isDebugEnabled()) - LOG.debug("onCloseRemote({})",close); + LOG.debug("onCloseRemote({})", closeInfo); ConnectionState event = null; synchronized (this) { @@ -318,27 +338,25 @@ public class IOState return; } - closeInfo = close; - + if (LOG.isDebugEnabled()) + LOG.debug("onCloseRemote(), input={}, output={}", inputAvailable, outputAvailable); + + this.closeInfo = closeInfo; + // turn off further input inputAvailable = false; - boolean in = inputAvailable; - boolean out = outputAvailable; if (closeHandshakeSource == CloseHandshakeSource.NONE) { closeHandshakeSource = CloseHandshakeSource.REMOTE; } - if (LOG.isDebugEnabled()) - LOG.debug("onCloseRemote(), input={}, output={}",in,out); - - if (!in && !out) + if (!outputAvailable) { LOG.debug("Close Handshake satisfied, disconnecting"); cleanClose = true; state = ConnectionState.CLOSED; - finalClose.compareAndSet(null,close); + finalClose.compareAndSet(null,closeInfo); event = this.state; } else if (this.state == ConnectionState.OPEN)