433262 - WebSocket / Advanced close use cases

+ AWSC.Flusher.onFailure() now uses IOState properly.
+ IOState now tracks the final CLOSED CloseInfo atomically
+ Renamed IOState.onReadEOF() to .onReadFailure(Throwable)
+ Added IOState.onWriteFailure(Throwable)
This commit is contained in:
Joakim Erdfelt 2014-05-07 13:10:48 -07:00
parent fa5a5f3507
commit 8ff1cec570
2 changed files with 162 additions and 71 deletions

View File

@ -34,7 +34,6 @@ import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.component.ContainerLifeCycle; import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -42,7 +41,6 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler; import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.CloseException; import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.CloseStatus;
import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.SuspendToken; import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.api.WebSocketPolicy;
@ -72,6 +70,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override @Override
protected void onFailure(Throwable x) protected void onFailure(Throwable x)
{ {
session.notifyError(x);
if (ioState.wasAbnormalClose()) if (ioState.wasAbnormalClose())
{ {
LOG.ignore(x); LOG.ignore(x);
@ -79,34 +79,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
} }
LOG.debug("Write flush failure",x); LOG.debug("Write flush failure",x);
ioState.onWriteFailure(x);
// Unable to write? can't notify other side of close, so disconnect.
// This is an ABNORMAL closure
String reason = "Websocket write failure";
if (x instanceof EOFException)
{
reason = "EOF";
Throwable cause = x.getCause();
if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
{
reason = "EOF: " + cause.getMessage();
}
}
else
{
if (StringUtil.isNotBlank(x.getMessage()))
{
reason = x.getMessage();
}
}
// Abnormal Close
reason = CloseStatus.trimMaxReasonLength(reason);
session.notifyError(x);
session.notifyClose(StatusCode.ABNORMAL,reason);
disconnect(); // disconnect endpoint & connection
} }
} }
@ -563,7 +536,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
else if (filled < 0) else if (filled < 0)
{ {
LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress()); LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
ioState.onReadEOF(); ioState.onReadFailure(new EOFException("Remote Read EOF"));
return -1; return -1;
} }
else else

View File

@ -18,12 +18,16 @@
package org.eclipse.jetty.websocket.common.io; package org.eclipse.jetty.websocket.common.io;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.CloseStatus;
import org.eclipse.jetty.websocket.api.StatusCode; import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.common.CloseInfo; import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.ConnectionState; import org.eclipse.jetty.websocket.common.ConnectionState;
@ -62,10 +66,38 @@ public class IOState
private ConnectionState state; private ConnectionState state;
private final List<ConnectionStateListener> listeners = new CopyOnWriteArrayList<>(); private final List<ConnectionStateListener> listeners = new CopyOnWriteArrayList<>();
/**
* Is input on websocket available (for reading frames).
* Used to determine close handshake completion, and track half-close states
*/
private boolean inputAvailable; private boolean inputAvailable;
/**
* Is output on websocket available (for writing frames).
* Used to determine close handshake completion, and track half-closed states.
*/
private boolean outputAvailable; private boolean outputAvailable;
/**
* Initiator of the close handshake.
* Used to determine who initiated a close handshake for reply reasons.
*/
private CloseHandshakeSource closeHandshakeSource; private CloseHandshakeSource closeHandshakeSource;
/**
* The close info for the initiator of the close handshake.
* It is possible in abnormal close scenarios to have a different
* final close info that is used to notify the WS-Endpoint's onClose()
* events with.
*/
private CloseInfo closeInfo; private CloseInfo closeInfo;
/**
* Atomic reference to the final close info.
* This can only be set once, and is used for the WS-Endpoint's onClose()
* event.
*/
private AtomicReference<CloseInfo> finalClose = new AtomicReference<>();
/**
* Tracker for if the close handshake was completed successfully by
* both sides. False if close was sudden or abnormal.
*/
private boolean cleanClose; private boolean cleanClose;
/** /**
@ -104,6 +136,11 @@ public class IOState
public CloseInfo getCloseInfo() public CloseInfo getCloseInfo()
{ {
CloseInfo ci = finalClose.get();
if (ci != null)
{
return ci;
}
return closeInfo; return closeInfo;
} }
@ -137,6 +174,7 @@ public class IOState
private void notifyStateListeners(ConnectionState state) private void notifyStateListeners(ConnectionState state)
{ {
LOG.debug("Notify State Listeners: {}",state);
for (ConnectionStateListener listener : listeners) for (ConnectionStateListener listener : listeners)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -170,7 +208,7 @@ public class IOState
} }
this.state = ConnectionState.CLOSED; this.state = ConnectionState.CLOSED;
this.closeInfo = close; finalClose.compareAndSet(null,close);
this.inputAvailable = false; this.inputAvailable = false;
this.outputAvailable = false; this.outputAvailable = false;
this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL; this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
@ -185,6 +223,7 @@ public class IOState
public void onCloseLocal(CloseInfo close) public void onCloseLocal(CloseInfo close)
{ {
ConnectionState event = null; ConnectionState event = null;
ConnectionState abnormalEvent = null;
ConnectionState initialState = this.state; ConnectionState initialState = this.state;
LOG.debug("onCloseLocal({}) : {}",close,initialState); LOG.debug("onCloseLocal({}) : {}",close,initialState);
if (initialState == ConnectionState.CLOSED) if (initialState == ConnectionState.CLOSED)
@ -223,6 +262,7 @@ public class IOState
LOG.debug("Close Handshake satisfied, disconnecting"); LOG.debug("Close Handshake satisfied, disconnecting");
cleanClose = true; cleanClose = true;
this.state = ConnectionState.CLOSED; this.state = ConnectionState.CLOSED;
finalClose.compareAndSet(null,close);
event = this.state; event = this.state;
} }
else if (this.state == ConnectionState.OPEN) else if (this.state == ConnectionState.OPEN)
@ -230,30 +270,27 @@ public class IOState
// We are now entering CLOSING (or half-closed) // We are now entering CLOSING (or half-closed)
this.state = ConnectionState.CLOSING; this.state = ConnectionState.CLOSING;
event = this.state; event = this.state;
// if abnormal, we don't expect an answer.
if (close.isAbnormal())
{
abnormalEvent = ConnectionState.CLOSED;
finalClose.compareAndSet(null,close);
cleanClose = false;
outputAvailable = false;
inputAvailable = false;
closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
}
} }
} }
// Only notify on state change events // Only notify on state change events
if (event != null) if (event != null)
{ {
LOG.debug("notifying state listeners: {}",event);
notifyStateListeners(event); notifyStateListeners(event);
// if abnormal, we don't expect an answer. if(abnormalEvent != null) {
if (close.isAbnormal()) notifyStateListeners(abnormalEvent);
{
LOG.debug("Abnormal close, disconnecting");
synchronized (this)
{
state = ConnectionState.CLOSED;
cleanClose = false;
outputAvailable = false;
inputAvailable = false;
closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
event = this.state;
}
notifyStateListeners(event);
return;
} }
} }
} }
@ -291,6 +328,7 @@ public class IOState
LOG.debug("Close Handshake satisfied, disconnecting"); LOG.debug("Close Handshake satisfied, disconnecting");
cleanClose = true; cleanClose = true;
state = ConnectionState.CLOSED; state = ConnectionState.CLOSED;
finalClose.compareAndSet(null,close);
event = this.state; event = this.state;
} }
else if (this.state == ConnectionState.OPEN) else if (this.state == ConnectionState.OPEN)
@ -315,15 +353,15 @@ public class IOState
*/ */
public void onConnected() public void onConnected()
{ {
if (this.state != ConnectionState.CONNECTING)
{
LOG.debug("Unable to set to connected, not in CONNECTING state: {}",this.state);
return;
}
ConnectionState event = null; ConnectionState event = null;
synchronized (this) synchronized (this)
{ {
if (this.state != ConnectionState.CONNECTING)
{
LOG.debug("Unable to set to connected, not in CONNECTING state: {}",this.state);
return;
}
this.state = ConnectionState.CONNECTED; this.state = ConnectionState.CONNECTED;
inputAvailable = false; // cannot read (yet) inputAvailable = false; // cannot read (yet)
outputAvailable = true; // write allowed outputAvailable = true; // write allowed
@ -355,21 +393,21 @@ public class IOState
*/ */
public void onOpened() public void onOpened()
{ {
if (this.state == ConnectionState.OPEN)
{
// already opened
return;
}
if (this.state != ConnectionState.CONNECTED)
{
LOG.debug("Unable to open, not in CONNECTED state: {}",this.state);
return;
}
ConnectionState event = null; ConnectionState event = null;
synchronized (this) synchronized (this)
{ {
if (this.state == ConnectionState.OPEN)
{
// already opened
return;
}
if (this.state != ConnectionState.CONNECTED)
{
LOG.debug("Unable to open, not in CONNECTED state: {}",this.state);
return;
}
this.state = ConnectionState.OPEN; this.state = ConnectionState.OPEN;
this.inputAvailable = true; this.inputAvailable = true;
this.outputAvailable = true; this.outputAvailable = true;
@ -379,11 +417,11 @@ public class IOState
} }
/** /**
* The local endpoint has reached a read EOF. * The local endpoint has reached a read failure.
* <p> * <p>
* This could be a normal result after a proper close handshake, or even a premature close due to a connection disconnect. * This could be a normal result after a proper close handshake, or even a premature close due to a connection disconnect.
*/ */
public void onReadEOF() public void onReadFailure(Throwable t)
{ {
ConnectionState event = null; ConnectionState event = null;
synchronized (this) synchronized (this)
@ -394,7 +432,29 @@ public class IOState
return; return;
} }
CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,"Read EOF"); // Build out Close Reason
String reason = "WebSocket Read Failure";
if (t instanceof EOFException)
{
reason = "WebSocket Read EOF";
Throwable cause = t.getCause();
if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
{
reason = "EOF: " + cause.getMessage();
}
}
else
{
if (StringUtil.isNotBlank(t.getMessage()))
{
reason = t.getMessage();
}
}
reason = CloseStatus.trimMaxReasonLength(reason);
CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason);
finalClose.compareAndSet(null,close);
this.cleanClose = false; this.cleanClose = false;
this.state = ConnectionState.CLOSED; this.state = ConnectionState.CLOSED;
@ -407,6 +467,56 @@ public class IOState
notifyStateListeners(event); notifyStateListeners(event);
} }
/**
* The local endpoint has reached a write failure.
* <p>
* A low level I/O failure, or even a jetty side EndPoint close (from idle timeout) are a few scenarios
*/
public void onWriteFailure(Throwable t)
{
ConnectionState event = null;
synchronized (this)
{
if (this.state == ConnectionState.CLOSED)
{
// already closed
return;
}
// Build out Close Reason
String reason = "WebSocket Write Failure";
if (t instanceof EOFException)
{
reason = "WebSocket Write EOF";
Throwable cause = t.getCause();
if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
{
reason = "EOF: " + cause.getMessage();
}
}
else
{
if (StringUtil.isNotBlank(t.getMessage()))
{
reason = t.getMessage();
}
}
reason = CloseStatus.trimMaxReasonLength(reason);
CloseInfo close = new CloseInfo(StatusCode.ABNORMAL,reason);
finalClose.compareAndSet(null,close);
this.cleanClose = false;
this.state = ConnectionState.CLOSED;
this.inputAvailable = false;
this.outputAvailable = false;
this.closeHandshakeSource = CloseHandshakeSource.ABNORMAL;
event = this.state;
}
notifyStateListeners(event);
}
public void onDisconnected() public void onDisconnected()
{ {
ConnectionState event = null; ConnectionState event = null;
@ -451,7 +561,15 @@ public class IOState
str.append("out"); str.append("out");
if ((state == ConnectionState.CLOSED) || (state == ConnectionState.CLOSING)) if ((state == ConnectionState.CLOSED) || (state == ConnectionState.CLOSING))
{ {
str.append(",close=").append(closeInfo); CloseInfo ci = finalClose.get();
if (ci != null)
{
str.append(",finalClose=").append(ci);
}
else
{
str.append(",close=").append(closeInfo);
}
str.append(",clean=").append(cleanClose); str.append(",clean=").append(cleanClose);
str.append(",closeSource=").append(closeHandshakeSource); str.append(",closeSource=").append(closeHandshakeSource);
} }