402008 - Strange behavior when clients are suddenly killed

+ Fixing close and callback failure notificiations to address issues
   with suddenly killed clients.
This commit is contained in:
Joakim Erdfelt 2013-03-05 13:10:55 -07:00
parent 0f415a735a
commit 2d74857ffa
5 changed files with 201 additions and 73 deletions

View File

@ -22,6 +22,26 @@ public class CloseStatus
{
private static final int MAX_CONTROL_PAYLOAD = 125;
private static final int MAX_REASON_PHRASE = MAX_CONTROL_PAYLOAD - 2;
/**
* Convenience method for trimming a long reason phrase at the maximum reason phrase length.
*
* @param reason
* the proposed reason phrase
* @return the reason phrase (trimmed if needed)
*/
public static String trimMaxReasonLength(String reason)
{
if (reason.length() > MAX_REASON_PHRASE)
{
return reason.substring(0,MAX_REASON_PHRASE);
}
else
{
return reason;
}
}
private int code;
private String phrase;

View File

@ -113,7 +113,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
public void close(int statusCode, String reason)
{
connection.close(statusCode,reason);
websocket.onClose(new CloseInfo(statusCode,reason));
notifyClose(statusCode,reason);
}
/**
@ -125,7 +125,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
connection.disconnect();
// notify of harsh disconnect
websocket.onClose(new CloseInfo(StatusCode.NO_CLOSE,"Harsh disconnect"));
notifyClose(StatusCode.NO_CLOSE,"Harsh disconnect");
}
@Override
@ -153,6 +153,36 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
}
}
@Override
public boolean equals(Object obj)
{
if (this == obj)
{
return true;
}
if (obj == null)
{
return false;
}
if (getClass() != obj.getClass())
{
return false;
}
WebSocketSession other = (WebSocketSession)obj;
if (connection == null)
{
if (other.connection != null)
{
return false;
}
}
else if (!connection.equals(other.connection))
{
return false;
}
return true;
}
public LogicalConnection getConnection()
{
return connection;
@ -236,6 +266,15 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
return this.upgradeResponse;
}
@Override
public int hashCode()
{
final int prime = 31;
int result = 1;
result = (prime * result) + ((connection == null)?0:connection.hashCode());
return result;
}
/**
* Incoming Errors from Parser
*/
@ -288,6 +327,11 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Inc
return "wss".equalsIgnoreCase(requestURI.getScheme());
}
public void notifyClose(int statusCode, String reason)
{
websocket.onClose(new CloseInfo(statusCode,reason));
}
/**
* Open/Activate the session
*

View File

@ -249,6 +249,22 @@ public class ExtensionStack extends ContainerLifeCycle implements IncomingFrames
@Override
public String toString()
{
return String.format("ExtensionStack[extensions=%s]",extensions);
StringBuilder s = new StringBuilder();
s.append("ExtensionStack[");
s.append("extensions=[");
boolean delim = false;
for (Extension ext : extensions)
{
if (delim)
{
s.append(',');
}
s.append(ext.getName());
delim = true;
}
s.append("],incoming=").append(this.nextIncoming.getClass().getName());
s.append(",outgoing=").append(this.nextOutgoing.getClass().getName());
s.append("]");
return s.toString();
}
}

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.common.io;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
@ -35,10 +36,12 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.ForkInvoker;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.CloseStatus;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketException;
@ -60,10 +63,40 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
private class FlushCallback implements Callback
{
/**
* The Endpoint.write() failure path
*/
@Override
public void failed(Throwable x)
{
LOG.debug("Write flush failure",x);
// Unable to write? can't notify other side of close, so disconnect.
// This is an ABNORMAL closure
String reason = "Websocket write failure";
if (x instanceof EOFException)
{
reason = "EOF";
Throwable cause = x.getCause();
if ((cause != null) && (StringUtil.isNotBlank(cause.getMessage())))
{
reason = "EOF: " + cause.getMessage();
}
}
else
{
if (StringUtil.isNotBlank(x.getMessage()))
{
reason = x.getMessage();
}
}
// Abnormal Close
reason = CloseStatus.trimMaxReasonLength(reason);
session.notifyClose(StatusCode.NO_CLOSE,reason);
disconnect(); // disconnect endpoint & connection
}
@Override
@ -122,20 +155,24 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
public static class Stats {
public static class Stats
{
private AtomicLong countFillInterestedEvents = new AtomicLong(0);
private AtomicLong countOnFillableEvents = new AtomicLong(0);
private AtomicLong countFillableErrors = new AtomicLong(0);
public long getFillableErrorCount() {
public long getFillableErrorCount()
{
return countFillableErrors.get();
}
public long getFillInterestedCount() {
public long getFillInterestedCount()
{
return countFillInterestedEvents.get();
}
public long getOnFillableCount() {
public long getOnFillableCount()
{
return countOnFillableEvents.get();
}
}
@ -278,12 +315,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
synchronized (writeBytes)
{
if (writeBytes.isFailed())
{
LOG.debug(".flush() - queue is in failed state");
return;
}
if (flushing)
{
return;
@ -398,11 +429,17 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return isFilling;
}
/**
* Physical connection disconnect.
* <p>
* Not related to WebSocket close handshake.
*/
@Override
public void onClose()
{
super.onClose();
this.getIOState().setState(ConnectionState.CLOSED);
writeBytes.close();
}
@Override
@ -459,7 +496,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
if ((state.getState() == ConnectionState.CLOSING) || (state.getState() == ConnectionState.CLOSED))
{
// close already initiated, extra timeouts not relevant
// allow udnerlying connection and endpoint to disconnect on its own
// allow underlying connection and endpoint to disconnect on its own
return true;
}
@ -467,6 +504,9 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
// Note: it is not possible in 100% of cases during read timeout to send this close frame.
session.close(StatusCode.NORMAL,"Idle Timeout");
// Force closure of writeBytes
writeBytes.close();
return false;
}
@ -489,15 +529,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
LOG.debug("outgoingFrame({}, {})",frame,callback);
}
if (!isOpen())
{
return;
}
synchronized (writeBytes)
{
writeBytes.enqueue(frame,WriteCallbackWrapper.wrap(callback));
}
writeBytes.enqueue(frame,WriteCallbackWrapper.wrap(callback));
flush();
}
@ -516,7 +548,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
else if (filled < 0)
{
LOG.debug("read - EOF Reached");
LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
return -1;
}
else
@ -568,7 +600,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override
public void setInputBufferSize(int inputBufferSize)
{
if(inputBufferSize < MIN_BUFFER_SIZE) {
if (inputBufferSize < MIN_BUFFER_SIZE)
{
throw new IllegalArgumentException("Cannot have buffer size less than " + MIN_BUFFER_SIZE);
}
super.setInputBufferSize(inputBufferSize);

View File

@ -18,6 +18,7 @@
package org.eclipse.jetty.websocket.common.io;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.LinkedList;
@ -40,6 +41,7 @@ public class WriteBytesProvider implements Callback
{
private class FrameEntry
{
protected final AtomicBoolean failed = new AtomicBoolean(false);
protected final Frame frame;
protected final Callback callback;
@ -58,6 +60,14 @@ public class WriteBytesProvider implements Callback
}
return buffer;
}
public void notifyFailure(Throwable t)
{
if (failed.getAndSet(true) == false)
{
notifySafeFailure(callback,t);
}
}
}
private static final Logger LOG = Log.getLogger(WriteBytesProvider.class);
@ -72,7 +82,7 @@ public class WriteBytesProvider implements Callback
private int bufferSize = 2048;
/** Currently active frame */
private FrameEntry active;
/** Failure state for the entire WriteBytesProvider */
/** Tracking for failure */
private Throwable failure;
/** The last requested buffer */
private ByteBuffer buffer;
@ -97,6 +107,17 @@ public class WriteBytesProvider implements Callback
this.closed = new AtomicBoolean(false);
}
/**
* Force closure of write bytes
*/
public void close()
{
// Set queue closed, no new enqueue allowed.
this.closed.set(true);
// flush out backlog in queue
failAll(new EOFException("Connection has been disconnected"));
}
public void enqueue(Frame frame, Callback callback)
{
Objects.requireNonNull(frame);
@ -106,7 +127,7 @@ public class WriteBytesProvider implements Callback
if (closed.get())
{
// Closed for more frames.
LOG.debug("Write is closed: {}",frame,callback);
LOG.debug("Write is closed: {} {}",frame,callback);
if (callback != null)
{
callback.failed(new IOException("Write is closed"));
@ -114,10 +135,11 @@ public class WriteBytesProvider implements Callback
return;
}
if (isFailed())
if (failure != null)
{
// no changes when failed
notifyFailure(callback);
LOG.debug("Write is in failure: {} {}",frame,callback);
notifySafeFailure(callback,failure);
return;
}
@ -143,28 +165,31 @@ public class WriteBytesProvider implements Callback
{
synchronized (this)
{
if (isFailed())
// fail active (if set)
if (active != null)
{
// already failed.
return;
active.notifyFailure(t);
}
failure = t;
// fail others
for (FrameEntry fe : queue)
{
notifyFailure(fe.callback);
fe.notifyFailure(t);
}
queue.clear();
// notify flush callback
flushCallback.failed(failure);
flushCallback.failed(t);
}
}
/**
* Write of ByteBuffer failed.
* Callback failure.
* <p>
* Conditions: for Endpoint.write() failure.
*
* @param cause
* the cause of the failure
@ -211,11 +236,6 @@ public class WriteBytesProvider implements Callback
return buffer;
}
public Throwable getFailure()
{
return failure;
}
/**
* Used to test for the final frame possible to be enqueued, the CLOSE frame.
*
@ -229,24 +249,16 @@ public class WriteBytesProvider implements Callback
}
}
public boolean isFailed()
private void notifySafeFailure(Callback callback, Throwable t)
{
return (failure != null);
}
/**
* Notify specific callback of failure.
*
* @param callback
* the callback to notify
*/
private void notifyFailure(Callback callback)
{
if (callback == null)
try
{
return;
callback.failed(t);
}
catch (Throwable e)
{
LOG.warn("Uncaught exception",e);
}
callback.failed(failure);
}
/**
@ -268,6 +280,8 @@ public class WriteBytesProvider implements Callback
@Override
public void succeeded()
{
Callback successCallback = null;
synchronized (this)
{
// Release the active byte buffer first
@ -281,27 +295,28 @@ public class WriteBytesProvider implements Callback
if (active.frame.remaining() <= 0)
{
// All done with active FrameEntry
if (active.callback != null)
{
try
{
// TODO: should probably have callback invoked in new thread as part of scheduler
// notify of success
active.callback.succeeded();
}
catch (Throwable t)
{
LOG.warn("Callback failure",t);
}
}
// null it out
successCallback = active.callback;
// Forget active
active = null;
}
// notify flush callback
flushCallback.succeeded();
}
// Notify success (outside of synchronize lock)
if (successCallback != null)
{
try
{
// notify of success
successCallback.succeeded();
}
catch (Throwable t)
{
LOG.warn("Callback failure",t);
}
}
}
@Override
@ -310,10 +325,10 @@ public class WriteBytesProvider implements Callback
StringBuilder b = new StringBuilder();
b.append("WriteBytesProvider[");
b.append("flushCallback=").append(flushCallback);
if (isFailed())
if (failure != null)
{
b.append(",FAILURE=").append(failure.getClass().getName());
b.append(",").append(failure.getMessage());
b.append(",failure=").append(failure.getClass().getName());
b.append(":").append(failure.getMessage());
}
else
{