Issue #272 - Attempting to centralize close logic in Session.close()

This commit is contained in:
Joakim Erdfelt 2017-10-03 15:29:27 -07:00
parent bb5195192a
commit c0dfa1dd50
6 changed files with 201 additions and 215 deletions

View File

@ -157,7 +157,7 @@ public interface Session extends Closeable
*
* @return whether the session is open
*/
abstract boolean isOpen();
boolean isOpen();
/**
* Return true if and only if the underlying socket is using a secure transport.

View File

@ -31,27 +31,10 @@ import org.eclipse.jetty.websocket.common.io.IOState;
public interface LogicalConnection extends OutgoingFrames, SuspendToken
{
/**
* Send a websocket Close frame, without a status code or reason.
* <p>
* Basic usage: results in an non-blocking async write, then connection close.
*
* @see org.eclipse.jetty.websocket.api.StatusCode
* @see #close(int, String)
* Called to indicate a close frame was successfully sent to the remote.
* @param close the close details
*/
public void close();
/**
* Send a websocket Close frame, with status code.
* <p>
* Advanced usage: results in an non-blocking async write, then connection close.
*
* @param statusCode
* the status code
* @param reason
* the (optional) reason. (can be null for no reason)
* @see org.eclipse.jetty.websocket.api.StatusCode
*/
public void close(int statusCode, String reason);
void onLocalClose(CloseInfo close);
/**
* Terminate the connection (no close frame sent)
@ -75,7 +58,7 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
*
* @return the idle timeout in milliseconds
*/
public long getIdleTimeout();
long getIdleTimeout();
/**
* Get the IOState of the connection.
@ -119,7 +102,7 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
*
* @return true if connection is open
*/
public boolean isOpen();
boolean isOpen();
/**
* Tests if the connection is actively reading.
@ -149,6 +132,13 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
*/
void setNextIncomingFrames(IncomingFrames incoming);
/**
* Associate the Active Session with the connection.
*
* @param session the session for this connection
*/
void setSession(WebSocketSession session);
/**
* Suspend a the incoming read events on the connection.
* @return the suspend token
@ -159,5 +149,5 @@ public interface LogicalConnection extends OutgoingFrames, SuspendToken
* Get Unique ID for the Connection
* @return the unique ID for the connection
*/
public String getId();
String getId();
}

View File

@ -29,9 +29,11 @@ import java.util.Objects;
import java.util.ServiceLoader;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicBoolean;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.annotation.ManagedAttribute;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
@ -51,11 +53,13 @@ import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.api.extensions.OutgoingFrames;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.frames.CloseFrame;
import org.eclipse.jetty.websocket.common.io.IOState;
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
import org.eclipse.jetty.websocket.common.scopes.WebSocketContainerScope;
@ -64,6 +68,68 @@ import org.eclipse.jetty.websocket.common.scopes.WebSocketSessionScope;
@ManagedObject("A Jetty WebSocket Session")
public class WebSocketSession extends ContainerLifeCycle implements Session, RemoteEndpointFactory, WebSocketSessionScope, IncomingFrames, Connection.Listener, ConnectionStateListener
{
public static class OnCloseLocalCallback implements WriteCallback
{
private final Callback callback;
private final LogicalConnection connection;
private final CloseInfo close;
public OnCloseLocalCallback(Callback callback, LogicalConnection connection, CloseInfo close)
{
this.callback = callback;
this.connection = connection;
this.close = close;
}
@Override
public void writeSuccess()
{
try
{
if (callback != null)
{
callback.succeeded();
}
}
finally
{
connection.onLocalClose(close);
}
}
@Override
public void writeFailed(Throwable x)
{
try
{
if (callback != null)
{
callback.failed(x);
}
}
finally
{
connection.onLocalClose(close);
}
}
}
public class DisconnectCallback implements Callback
{
@Override
public void failed(Throwable x)
{
disconnect();
}
@Override
public void succeeded()
{
disconnect();
}
}
private static final Logger LOG = Log.getLogger(WebSocketSession.class);
private static final Logger LOG_OPEN = Log.getLogger(WebSocketSession.class.getName() + "_OPEN");
private final WebSocketContainerScope containerScope;
@ -72,6 +138,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
private final EventDriver websocket;
private final Executor executor;
private final WebSocketPolicy policy;
private final AtomicBoolean closed = new AtomicBoolean();
private ClassLoader classLoader;
private ExtensionFactory extensionFactory;
private RemoteEndpointFactory remoteEndpointFactory;
@ -100,27 +167,59 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
this.connection.getIOState().addListener(this);
this.policy = websocket.getPolicy();
this.connection.setSession(this);
addBean(this.connection);
addBean(this.websocket);
}
/**
* Aborts the active session abruptly.
*/
public void abort(int statusCode, String reason)
{
close(new CloseInfo(statusCode, reason), new DisconnectCallback());
}
@Override
public void close()
{
/* This is assumed to always be a NORMAL closure, no reason phrase */
close(StatusCode.NORMAL, null);
close(new CloseInfo(StatusCode.NORMAL), null);
}
@Override
public void close(CloseStatus closeStatus)
{
close(closeStatus.getCode(),closeStatus.getPhrase());
close(new CloseInfo(closeStatus.getCode(),closeStatus.getPhrase()), null);
}
@Override
public void close(int statusCode, String reason)
{
connection.close(statusCode,reason);
close(new CloseInfo(statusCode, reason), null);
}
/**
* CLOSE Primary Entry Point.
*
* <ul>
* <li>atomically enqueue CLOSE frame + flip flag to reject more frames</li>
* <li>setup CLOSE frame callback: must close flusher</li>
* </ul>
*
* @param closeInfo the close details
*/
private void close(CloseInfo closeInfo, Callback callback)
{
if (LOG.isDebugEnabled())
LOG.debug("close({})", closeInfo);
if (closed.compareAndSet(false, true))
{
CloseFrame frame = closeInfo.asFrame();
connection.outgoingFrame(frame, new OnCloseLocalCallback(callback, connection, closeInfo), BatchMode.OFF);
}
}
/**
@ -388,7 +487,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
{
return false;
}
return this.connection.isOpen();
return !closed.get() && this.connection.isOpen();
}
@Override
@ -420,11 +519,21 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Rem
incomingError(cause);
}
/**
* Jetty Connection onClosed event
*
* @param connection the connection that was closed
*/
@Override
public void onClosed(Connection connection)
{
}
/**
* Jetty Connection onOpen event
*
* @param connection the connection that was opened
*/
@Override
public void onOpened(Connection connection)
{

View File

@ -26,7 +26,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@ -53,6 +52,7 @@ import org.eclipse.jetty.websocket.common.ConnectionState;
import org.eclipse.jetty.websocket.common.Generator;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.Parser;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
/**
@ -60,8 +60,6 @@ import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
*/
public abstract class AbstractWebSocketConnection extends AbstractConnection implements LogicalConnection, Connection.UpgradeTo, ConnectionStateListener, Dumpable
{
private final AtomicBoolean closed = new AtomicBoolean();
private class Flusher extends FrameFlusher
{
private Flusher(ByteBufferPool bufferPool, Generator generator, EndPoint endpoint)
@ -86,91 +84,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
public class OnDisconnectCallback implements WriteCallback
{
private final boolean outputOnly;
public OnDisconnectCallback(boolean outputOnly)
{
this.outputOnly = outputOnly;
}
@Override
public void writeFailed(Throwable x)
{
disconnect(outputOnly);
}
@Override
public void writeSuccess()
{
disconnect(outputOnly);
}
}
public class OnCloseLocalCallback implements WriteCallback
{
private final WriteCallback callback;
private final CloseInfo close;
public OnCloseLocalCallback(WriteCallback callback, CloseInfo close)
{
this.callback = callback;
this.close = close;
}
public OnCloseLocalCallback(CloseInfo close)
{
this(null,close);
}
@Override
public void writeFailed(Throwable x)
{
try
{
if (callback != null)
{
callback.writeFailed(x);
}
}
finally
{
onLocalClose();
}
}
@Override
public void writeSuccess()
{
try
{
if (callback != null)
{
callback.writeSuccess();
}
}
finally
{
onLocalClose();
}
}
private void onLocalClose()
{
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("Local Close Confirmed {}",close);
if (close.isAbnormal())
{
ioState.onAbnormalClose(close);
}
else
{
ioState.onCloseLocal(close);
}
}
}
public static class Stats
{
private AtomicLong countFillInterestedEvents = new AtomicLong(0);
@ -193,7 +106,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
}
private static enum ReadMode
private enum ReadMode
{
PARSE,
DISCARD,
@ -201,8 +114,6 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
private static final Logger LOG = Log.getLogger(AbstractWebSocketConnection.class);
private static final Logger LOG_OPEN = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_OPEN");
private static final Logger LOG_CLOSE = Log.getLogger(AbstractWebSocketConnection.class.getName() + "_CLOSE");
/**
* Minimum size of a buffer is the determined to be what would be the maximum framing header size (not including payload)
@ -217,6 +128,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
private final AtomicBoolean suspendToken;
private final FrameFlusher flusher;
private final String id;
private WebSocketSession session;
private List<ExtensionConfig> extensions;
private boolean isFilling;
private ByteBuffer prefillBuffer;
@ -252,82 +164,58 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
return super.getExecutor();
}
@Override
public void onLocalClose(CloseInfo close)
{
if (LOG.isDebugEnabled())
LOG.debug("Local Close Confirmed {}",close);
if (close.isAbnormal())
{
ioState.onAbnormalClose(close);
}
else
{
ioState.onCloseLocal(close);
}
}
@Override
public void setSession(WebSocketSession session)
{
this.session = session;
}
/**
* Close without a close code or reason
* Jetty Connection Close
*/
@Override
public void close()
{
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("close()");
close(new CloseInfo());
}
/**
* Close the connection.
* <p> fillInterested();
* This can result in a close handshake over the network, or a simple local abnormal close
*
* @param statusCode
* the WebSocket status code.
* @param reason
* the (optional) reason string. (null is allowed)
* @see StatusCode
*/
@Override
public void close(int statusCode, String reason)
{
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("close({},{})", statusCode, reason);
close(new CloseInfo(statusCode, reason));
}
private void close(CloseInfo closeInfo)
{
if (closed.compareAndSet(false, true))
outgoingFrame(closeInfo.asFrame(), new OnCloseLocalCallback(closeInfo), BatchMode.OFF);
session.close();
}
@Override
public void disconnect()
{
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("{} disconnect()",policy.getBehavior());
disconnect(false);
}
if (LOG.isDebugEnabled())
LOG.debug("{} disconnect()",policy.getBehavior());
try
{
flusher.close();
}
catch (Throwable ignored)
{
LOG.ignore(ignored);
}
private void disconnect(boolean onlyOutput)
{
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("{} disconnect({})",policy.getBehavior(),onlyOutput?"outputOnly":"both");
// close FrameFlusher, we cannot write anymore at this point.
flusher.close();
EndPoint endPoint = getEndPoint();
// We need to gently close first, to allow
// SSL close alerts to be sent by Jetty
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("Shutting down output {}",endPoint);
endPoint.shutdownOutput();
if (!onlyOutput)
{
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("Closing {}",endPoint);
endPoint.close();
}
}
protected void execute(Runnable task)
{
try
{
getExecutor().execute(task);
}
catch (RejectedExecutionException e)
{
if (LOG.isDebugEnabled())
LOG.debug("Job not dispatched: {}",task);
}
endPoint.close();
}
@Override
@ -414,7 +302,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override
public boolean isOpen()
{
return !closed.get();
return getEndPoint().isOpen();
}
@Override
@ -441,8 +329,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override
public void onConnectionStateChange(ConnectionState state)
{
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("{} Connection State Change: {}",policy.getBehavior(),state);
if (LOG.isDebugEnabled())
LOG.debug("{} Connection State Change: {}",policy.getBehavior(),state);
switch (state)
{
@ -464,29 +352,28 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
fillInterested();
break;
case CLOSED:
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("CLOSED - wasAbnormalClose: {}", ioState.wasAbnormalClose());
if (LOG.isDebugEnabled())
LOG.debug("CLOSED - wasAbnormalClose: {}", ioState.wasAbnormalClose());
if (ioState.wasAbnormalClose())
{
// Fire out a close frame, indicating abnormal shutdown, then disconnect
CloseInfo abnormal = new CloseInfo(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
outgoingFrame(abnormal.asFrame(),new OnDisconnectCallback(false),BatchMode.OFF);
session.close(StatusCode.SHUTDOWN,"Abnormal Close - " + ioState.getCloseInfo().getReason());
}
else
{
// Just disconnect
this.disconnect(false);
this.disconnect();
}
break;
case CLOSING:
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("CLOSING - wasRemoteCloseInitiated: {}", ioState.wasRemoteCloseInitiated());
if (LOG.isDebugEnabled())
LOG.debug("CLOSING - wasRemoteCloseInitiated: {}", ioState.wasRemoteCloseInitiated());
// First occurrence of .onCloseLocal or .onCloseRemote use
if (ioState.wasRemoteCloseInitiated())
{
CloseInfo close = ioState.getCloseInfo();
// reply to close handshake from remote
outgoingFrame(close.asFrame(),new OnCloseLocalCallback(new OnDisconnectCallback(true),close),BatchMode.OFF);
session.close(close.getStatusCode(), close.getReason());
}
default:
break;
@ -561,8 +448,8 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
@Override
public void onOpen()
{
if(LOG_OPEN.isDebugEnabled())
LOG_OPEN.debug("[{}] {}.onOpened()",policy.getBehavior(),this.getClass().getSimpleName());
if(LOG.isDebugEnabled())
LOG.debug("[{}] {}.onOpened()",policy.getBehavior(),this.getClass().getSimpleName());
super.onOpen();
this.ioState.onOpened();
}
@ -575,13 +462,13 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
{
IOState state = getIOState();
ConnectionState cstate = state.getConnectionState();
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("{} Read Timeout - {}",policy.getBehavior(),cstate);
if (LOG.isDebugEnabled())
LOG.debug("{} Read Timeout - {}",policy.getBehavior(),cstate);
if (cstate == ConnectionState.CLOSED)
{
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("onReadTimeout - Connection Already CLOSED");
if (LOG.isDebugEnabled())
LOG.debug("onReadTimeout - Connection Already CLOSED");
// close already completed, extra timeouts not relevant
// allow underlying connection and endpoint to disconnect on its own
return true;
@ -594,7 +481,7 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
finally
{
// This is an Abnormal Close condition
close(StatusCode.SHUTDOWN,"Idle Timeout");
session.close(StatusCode.SHUTDOWN,"Idle Timeout");
}
return false;
@ -628,14 +515,14 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
}
else if (filled < 0)
{
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("read - EOF Reached (remote: {})",getRemoteAddress());
if (LOG.isDebugEnabled())
LOG.debug("read - EOF Reached (remote: {})",getRemoteAddress());
return ReadMode.EOF;
}
else
{
if (LOG_CLOSE.isDebugEnabled())
LOG_CLOSE.debug("Discarded {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
if (LOG.isDebugEnabled())
LOG.debug("Discarded {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
}
}
}
@ -682,19 +569,21 @@ public abstract class AbstractWebSocketConnection extends AbstractConnection imp
catch (IOException e)
{
LOG.warn(e);
close(StatusCode.PROTOCOL,e.getMessage());
session.notifyError(e);
session.abort(StatusCode.PROTOCOL,e.getMessage());
return ReadMode.DISCARD;
}
catch (CloseException e)
{
LOG.debug(e);
close(e.getStatusCode(),e.getMessage());
session.notifyError(e);
session.close(e.getStatusCode(),e.getMessage());
return ReadMode.DISCARD;
}
catch (Throwable t)
{
LOG.warn(t);
close(StatusCode.ABNORMAL,t.getMessage());
session.abort(StatusCode.ABNORMAL,t.getMessage());
// TODO: should probably only switch to discard if a non-ws-endpoint error
return ReadMode.DISCARD;
}

View File

@ -26,7 +26,6 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.ExecutorThreadPool;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
@ -35,6 +34,7 @@ import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.ConnectionState;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.io.IOState.ConnectionStateListener;
import org.junit.rules.TestName;
@ -73,18 +73,14 @@ public class LocalWebSocketConnection implements LogicalConnection, IncomingFram
}
@Override
public void close()
public void setSession(WebSocketSession session)
{
close(StatusCode.NORMAL,null);
}
@Override
public void close(int statusCode, String reason)
public void onLocalClose(CloseInfo closeInfo)
{
if (LOG.isDebugEnabled())
LOG.debug("close({}, {})",statusCode,reason);
CloseInfo close = new CloseInfo(statusCode,reason);
ioState.onCloseLocal(close);
ioState.onCloseLocal(closeInfo);
}
public void connect()

View File

@ -30,7 +30,9 @@ import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.common.CloseInfo;
import org.eclipse.jetty.websocket.common.LogicalConnection;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.io.IOState;
public class DummyConnection implements LogicalConnection
@ -44,12 +46,12 @@ public class DummyConnection implements LogicalConnection
}
@Override
public void close()
public void setSession(WebSocketSession session)
{
}
@Override
public void close(int statusCode, String reason)
public void onLocalClose(CloseInfo close)
{
}