diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java index c88c4b3896b..9f0ead3d48d 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/AbstractWebSocketConnection.java @@ -34,7 +34,6 @@ import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; @@ -42,7 +41,6 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.CloseException; -import org.eclipse.jetty.websocket.api.CloseStatus; import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.SuspendToken; import org.eclipse.jetty.websocket.api.WebSocketPolicy; @@ -72,6 +70,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp @Override protected void onFailure(Throwable x) { + session.notifyError(x); + if (ioState.wasAbnormalClose()) { LOG.ignore(x); @@ -79,34 +79,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp } LOG.debug("Write flush failure",x); - - // Unable to write? can't notify other side of close, so disconnect. - // This is an ABNORMAL closure - String reason = "Websocket write failure"; - - if (x instanceof EOFException) - { - reason = "EOF"; - Throwable cause = x.getCause(); - if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage()))) - { - reason = "EOF: " + cause.getMessage(); - } - } - else - { - if (StringUtil.isNotBlank(x.getMessage())) - { - reason = x.getMessage(); - } - } - - // Abnormal Close - reason = CloseStatus.trimMaxReasonLength(reason); - session.notifyError(x); - session.notifyClose(StatusCode.ABNORMAL,reason); - - disconnect(); // disconnect endpoint & connection + ioState.onWriteFailure(x); } } @@ -563,7 +536,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp else if (filled < 0) { LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress()); - ioState.onReadEOF(); + ioState.onReadFailure(new EOFException("Remote Read EOF")); return -1; } else diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java index 815d2d30ad0..35c87d91786 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/IOState.java @@ -18,12 +18,16 @@ package org.eclipse.jetty.websocket.common.io; +import java.io.EOFException; import java.io.IOException; import java.util.List; import java.util.concurrent.CopyOnWriteArrayList; +import java.util.concurrent.atomic.AtomicReference; +import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.api.CloseStatus; import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.common.CloseInfo; import org.eclipse.jetty.websocket.common.ConnectionState; @@ -62,10 +66,38 @@ public class IOState private ConnectionState state; private final List listeners = new CopyOnWriteArrayList<>(); + /** + * Is input on websocket available (for reading frames). + * Used to determine close handshake completion, and track half-close states + */ private boolean inputAvailable; + /** + * Is output on websocket available (for writing frames). + * Used to determine close handshake completion, and track half-closed states. + */ private boolean outputAvailable; + /** + * Initiator of the close handshake. + * Used to determine who initiated a close handshake for reply reasons. + */ private CloseHandshakeSource closeHandshakeSource; + /** + * The close info for the initiator of the close handshake. + * It is possible in abnormal close scenarios to have a different + * final close info that is used to notify the WS-Endpoint's onClose() + * events with. + */ private CloseInfo closeInfo; + /** + * Atomic reference to the final close info. + * This can only be set once, and is used for the WS-Endpoint's onClose() + * event. + */ + private AtomicReference finalClose = new AtomicReference<>(); + /** + * Tracker for if the close handshake was completed successfully by + * both sides. False if close was sudden or abnormal. + */ private boolean cleanClose; /** @@ -104,6 +136,11 @@ public class IOState public CloseInfo getCloseInfo() { + CloseInfo ci = finalClose.get(); + if (ci != null) + { + return ci; + } return closeInfo; } @@ -137,6 +174,7 @@ public class IOState private void notifyStateListeners(ConnectionState state) { + LOG.debug("Notify State Listeners: {}",state); for (ConnectionStateListener listener : listeners) { if (LOG.isDebugEnabled()) @@ -170,7 +208,7 @@ public class IOState } this.state = ConnectionState.CLOSED; - this.closeInfo = close; + finalClose.compareAndSet(null,close); this.inputAvailable = false; this.outputAvailable = false; this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL; @@ -185,6 +223,7 @@ public class IOState public void onCloseLocal(CloseInfo close) { ConnectionState event = null; + ConnectionState abnormalEvent = null; ConnectionState initialState = this.state; LOG.debug("onCloseLocal({}) : {}",close,initialState); if (initialState == ConnectionState.CLOSED) @@ -223,6 +262,7 @@ public class IOState LOG.debug("Close Handshake satisfied, disconnecting"); cleanClose = true; this.state = ConnectionState.CLOSED; + finalClose.compareAndSet(null,close); event = this.state; } else if (this.state == ConnectionState.OPEN) @@ -230,30 +270,27 @@ public class IOState // We are now entering CLOSING (or half-closed) this.state = ConnectionState.CLOSING; event = this.state; + + // if abnormal, we don't expect an answer. + if (close.isAbnormal()) + { + abnormalEvent = ConnectionState.CLOSED; + finalClose.compareAndSet(null,close); + cleanClose = false; + outputAvailable = false; + inputAvailable = false; + closeHandshakeSource = CloseHandshakeSource.ABNORMAL; + } } } // Only notify on state change events if (event != null) { - LOG.debug("notifying state listeners: {}",event); notifyStateListeners(event); - - // if abnormal, we don't expect an answer. - if (close.isAbnormal()) - { - LOG.debug("Abnormal close, disconnecting"); - synchronized (this) - { - state = ConnectionState.CLOSED; - cleanClose = false; - outputAvailable = false; - inputAvailable = false; - closeHandshakeSource = CloseHandshakeSource.ABNORMAL; - event = this.state; - } - notifyStateListeners(event); - return; + + if(abnormalEvent != null) { + notifyStateListeners(abnormalEvent); } } } @@ -291,6 +328,7 @@ public class IOState LOG.debug("Close Handshake satisfied, disconnecting"); cleanClose = true; state = ConnectionState.CLOSED; + finalClose.compareAndSet(null,close); event = this.state; } else if (this.state == ConnectionState.OPEN) @@ -315,15 +353,15 @@ public class IOState */ public void onConnected() { - if (this.state != ConnectionState.CONNECTING) - { - LOG.debug("Unable to set to connected, not in CONNECTING state: {}",this.state); - return; - } - ConnectionState event = null; synchronized (this) { + if (this.state != ConnectionState.CONNECTING) + { + LOG.debug("Unable to set to connected, not in CONNECTING state: {}",this.state); + return; + } + this.state = ConnectionState.CONNECTED; inputAvailable = false; // cannot read (yet) outputAvailable = true; // write allowed @@ -355,21 +393,21 @@ public class IOState */ public void onOpened() { - if (this.state == ConnectionState.OPEN) - { - // already opened - return; - } - - if (this.state != ConnectionState.CONNECTED) - { - LOG.debug("Unable to open, not in CONNECTED state: {}",this.state); - return; - } - ConnectionState event = null; synchronized (this) { + if (this.state == ConnectionState.OPEN) + { + // already opened + return; + } + + if (this.state != ConnectionState.CONNECTED) + { + LOG.debug("Unable to open, not in CONNECTED state: {}",this.state); + return; + } + this.state = ConnectionState.OPEN; this.inputAvailable = true; this.outputAvailable = true; @@ -379,11 +417,11 @@ public class IOState } /** - * The local endpoint has reached a read EOF. + * The local endpoint has reached a read failure. *

* This could be a normal result after a proper close handshake, or even a premature close due to a connection disconnect. */ - public void onReadEOF() + public void onReadFailure(Throwable t) { ConnectionState event = null; synchronized (this) @@ -394,7 +432,29 @@ public class IOState return; } - CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Read EOF"); + // Build out Close Reason + String reason = "WebSocket Read Failure"; + if (t instanceof EOFException) + { + reason = "WebSocket Read EOF"; + Throwable cause = t.getCause(); + if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage()))) + { + reason = "EOF: " + cause.getMessage(); + } + } + else + { + if (StringUtil.isNotBlank(t.getMessage())) + { + reason = t.getMessage(); + } + } + + reason = CloseStatus.trimMaxReasonLength(reason); + CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason); + + finalClose.compareAndSet(null,close); this.cleanClose = false; this.state = ConnectionState.CLOSED; @@ -407,6 +467,56 @@ public class IOState notifyStateListeners(event); } + /** + * The local endpoint has reached a write failure. + *

+ * A low level I/O failure, or even a jetty side EndPoint close (from idle timeout) are a few scenarios + */ + public void onWriteFailure(Throwable t) + { + ConnectionState event = null; + synchronized (this) + { + if (this.state == ConnectionState.CLOSED) + { + // already closed + return; + } + + // Build out Close Reason + String reason = "WebSocket Write Failure"; + if (t instanceof EOFException) + { + reason = "WebSocket Write EOF"; + Throwable cause = t.getCause(); + if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage()))) + { + reason = "EOF: " + cause.getMessage(); + } + } + else + { + if (StringUtil.isNotBlank(t.getMessage())) + { + reason = t.getMessage(); + } + } + + reason = CloseStatus.trimMaxReasonLength(reason); + CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason); + + finalClose.compareAndSet(null,close); + + this.cleanClose = false; + this.state = ConnectionState.CLOSED; + this.inputAvailable = false; + this.outputAvailable = false; + this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL; + event = this.state; + } + notifyStateListeners(event); + } + public void onDisconnected() { ConnectionState event = null; @@ -451,7 +561,15 @@ public class IOState str.append("out"); if ((state == ConnectionState.CLOSED) || (state == ConnectionState.CLOSING)) { - str.append(",close=").append(closeInfo); + CloseInfo ci = finalClose.get(); + if (ci != null) + { + str.append(",finalClose=").append(ci); + } + else + { + str.append(",close=").append(closeInfo); + } str.append(",clean=").append(cleanClose); str.append(",closeSource=").append(closeHandshakeSource); }