Add a Throwable case to EndPoint and Connection onClose notifications

Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2019-02-07 11:55:40 +11:00
parent 2a69fcc7ac
commit 11dfe70fc2
16 changed files with 81 additions and 58 deletions

View File

@ -108,11 +108,11 @@ public class HTTP2Connection extends AbstractConnection implements WriteFlusher.
} }
@Override @Override
public void onClose() public void onClose(Throwable cause)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("HTTP2 Close {} ", this); LOG.debug("HTTP2 Close {} ", this);
super.onClose(); super.onClose(cause);
LifeCycle.stop(strategy); LifeCycle.stop(strategy);
} }

View File

@ -215,11 +215,15 @@ public abstract class AbstractConnection implements Connection
} }
@Override @Override
public void onClose() public void onClose(Throwable cause)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
{
if (cause==null)
LOG.debug("onClose {}", this); LOG.debug("onClose {}", this);
else
LOG.debug("onClose " + this, cause);
}
for (Listener listener : _listeners) for (Listener listener : _listeners)
onClosed(listener); onClosed(listener);
} }

View File

@ -232,13 +232,6 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
{ {
} }
protected void onClose(Throwable failure)
{
super.onClose();
_writeFlusher.onFail(failure);
_fillInterest.onFail(failure);
}
@Override @Override
public boolean isOutputShutdown() public boolean isOutputShutdown()
{ {
@ -347,12 +340,27 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
} }
@Override @Override
public void onClose() public final void onClose()
{
onClose(null);
}
@Override
public void onClose(Throwable failure)
{ {
super.onClose(); super.onClose();
if (failure==null)
{
_writeFlusher.onClose(); _writeFlusher.onClose();
_fillInterest.onClose(); _fillInterest.onClose();
} }
else
{
_writeFlusher.onFail(failure);
_fillInterest.onFail(failure);
}
}
@Override @Override
public void fillInterested(Callback callback) public void fillInterested(Callback callback)
@ -427,7 +435,7 @@ public abstract class AbstractEndPoint extends IdleTimeout implements EndPoint
ByteBuffer prefilled = (old_connection instanceof Connection.UpgradeFrom) ? ByteBuffer prefilled = (old_connection instanceof Connection.UpgradeFrom) ?
((Connection.UpgradeFrom)old_connection).onUpgradeFrom() : ((Connection.UpgradeFrom)old_connection).onUpgradeFrom() :
null; null;
old_connection.onClose(); old_connection.onClose(null);
old_connection.getEndPoint().setConnection(newConnection); old_connection.getEndPoint().setConnection(newConnection);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())

View File

@ -208,16 +208,16 @@ public abstract class ChannelEndPoint extends AbstractEndPoint implements Manage
} }
@Override @Override
public void onClose() public void onClose(Throwable cause)
{ {
try try
{ {
super.onClose(); super.onClose(cause);
} }
finally finally
{ {
if (_selector!=null) if (_selector!=null)
_selector.destroyEndPoint(this); _selector.destroyEndPoint(this, cause);
} }
} }

View File

@ -56,8 +56,9 @@ public interface Connection extends Closeable
/** /**
* <p>Callback method invoked when this connection is closed.</p> * <p>Callback method invoked when this connection is closed.</p>
* <p>Creators of the connection implementation are responsible for calling this method.</p> * <p>Creators of the connection implementation are responsible for calling this method.</p>
* @param cause The cause of the close or null for a normal close
*/ */
public void onClose(); public void onClose(Throwable cause);
/** /**
* @return the {@link EndPoint} associated with this Connection. * @return the {@link EndPoint} associated with this Connection.

View File

@ -150,7 +150,16 @@ public interface EndPoint extends Closeable
* Close any backing stream associated with the endpoint * Close any backing stream associated with the endpoint
*/ */
@Override @Override
void close(); default void close()
{
close (null);
}
/**
* Close any backing stream associated with the endpoint, passing a cause
* @param cause the reason for the close or null
*/
void close(Throwable cause);
/** /**
* Fill the passed buffer with data from this endpoint. The bytes are appended to any * Fill the passed buffer with data from this endpoint. The bytes are appended to any
@ -248,15 +257,16 @@ public interface EndPoint extends Closeable
/** /**
* <p>Callback method invoked when this {@link EndPoint} is opened.</p> * <p>Callback method invoked when this {@link EndPoint} is opened.</p>
* @see #onClose() * @see #onClose(Throwable)
*/ */
void onOpen(); void onOpen();
/** /**
* <p>Callback method invoked when this {@link EndPoint} is close.</p> * <p>Callback method invoked when this {@link EndPoint} is close.</p>
* @param cause The reason for the close, or null if a normal close.
* @see #onOpen() * @see #onOpen()
*/ */
void onClose(); void onClose(Throwable cause);
/** Is the endpoint optimized for DirectBuffer usage /** Is the endpoint optimized for DirectBuffer usage
* @return True if direct buffers can be used optimally. * @return True if direct buffers can be used optimally.

View File

@ -270,13 +270,13 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
LOG.debug("Created {}", endPoint); LOG.debug("Created {}", endPoint);
} }
public void destroyEndPoint(final EndPoint endPoint) public void destroyEndPoint(final EndPoint endPoint, Throwable cause)
{ {
// Waking up the selector is necessary to clean the // Waking up the selector is necessary to clean the
// cancelled-key set and tell the TCP stack that the // cancelled-key set and tell the TCP stack that the
// socket is closed (so that senders receive RST). // socket is closed (so that senders receive RST).
wakeup(); wakeup();
execute(new DestroyEndPoint(endPoint)); execute(new DestroyEndPoint(endPoint, cause));
} }
private int getActionSize() private int getActionSize()
@ -942,10 +942,12 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
private class DestroyEndPoint implements Runnable, Closeable private class DestroyEndPoint implements Runnable, Closeable
{ {
private final EndPoint endPoint; private final EndPoint endPoint;
private final Throwable cause;
public DestroyEndPoint(EndPoint endPoint) public DestroyEndPoint(EndPoint endPoint, Throwable cause)
{ {
this.endPoint = endPoint; this.endPoint = endPoint;
this.cause = cause;
} }
@Override @Override
@ -955,7 +957,7 @@ public class ManagedSelector extends ContainerLifeCycle implements Dumpable
LOG.debug("Destroyed {}", endPoint); LOG.debug("Destroyed {}", endPoint);
Connection connection = endPoint.getConnection(); Connection connection = endPoint.getConnection();
if (connection != null) if (connection != null)
_selectorManager.connectionClosed(connection); _selectorManager.connectionClosed(connection, cause);
_selectorManager.endPointClosed(endPoint); _selectorManager.endPointClosed(endPoint);
} }

View File

@ -94,9 +94,9 @@ public class NetworkTrafficSelectChannelEndPoint extends SocketChannelEndPoint
} }
@Override @Override
public void onClose() public void onClose(Throwable cause)
{ {
super.onClose(); super.onClose(cause);
if (listeners != null && !listeners.isEmpty()) if (listeners != null && !listeners.isEmpty())
{ {
for (NetworkTrafficListener listener : listeners) for (NetworkTrafficListener listener : listeners)

View File

@ -337,12 +337,13 @@ public abstract class SelectorManager extends ContainerLifeCycle implements Dump
* <p>Callback method invoked when a connection is closed.</p> * <p>Callback method invoked when a connection is closed.</p>
* *
* @param connection the connection just closed * @param connection the connection just closed
* @param cause the cause of the close or null for normal close
*/ */
public void connectionClosed(Connection connection) public void connectionClosed(Connection connection, Throwable cause)
{ {
try try
{ {
connection.onClose(); connection.onClose(cause);
} }
catch (Throwable x) catch (Throwable x)
{ {

View File

@ -284,10 +284,10 @@ public class SslConnection extends AbstractConnection implements Connection.Upgr
} }
@Override @Override
public void onClose() public void onClose(Throwable cause)
{ {
_decryptedEndPoint.getConnection().onClose(); _decryptedEndPoint.getConnection().onClose(cause);
super.onClose(); super.onClose(cause);
} }
@Override @Override

View File

@ -203,9 +203,9 @@ public class SslConnectionTest
} }
@Override @Override
public void onClose() public void onClose(Throwable cause)
{ {
super.onClose(); super.onClose(cause);
} }
@Override @Override

View File

@ -511,10 +511,13 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
@Override @Override
public void onClose() public void onClose(Throwable cause)
{ {
if (cause==null)
_sendCallback.close(); _sendCallback.close();
super.onClose(); else
_sendCallback.failed(cause);
super.onClose(cause);
} }
@Override @Override

View File

@ -324,13 +324,13 @@ public class LocalConnector extends AbstractConnector
} }
@Override @Override
public void onClose() public void onClose(Throwable cause)
{ {
Connection connection = getConnection(); Connection connection = getConnection();
if (connection!=null) if (connection!=null)
connection.onClose(); connection.onClose(cause);
LocalConnector.this.onEndPointClosed(this); LocalConnector.this.onEndPointClosed(this);
super.onClose(); super.onClose(cause);
_closed.countDown(); _closed.countDown();
} }

View File

@ -661,9 +661,9 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
} }
@Override @Override
public void close() public void close(Throwable cause)
{ {
_endp.close(); _endp.close(cause);
} }
@Override @Override
@ -739,9 +739,9 @@ public class ProxyConnectionFactory extends AbstractConnectionFactory
} }
@Override @Override
public void onClose() public void onClose(Throwable cause)
{ {
_endp.onClose(); _endp.onClose(cause);
} }
@Override @Override

View File

@ -41,6 +41,7 @@ public class FrameFlusher extends IteratingCallback
{ {
public static final Frame FLUSH_FRAME = new Frame(OpCode.BINARY); public static final Frame FLUSH_FRAME = new Frame(OpCode.BINARY);
private static final Logger LOG = Log.getLogger(FrameFlusher.class); private static final Logger LOG = Log.getLogger(FrameFlusher.class);
private static final Throwable CLOSED_CHANNEL = new ClosedChannelException();
private final ByteBufferPool bufferPool; private final ByteBufferPool bufferPool;
private final EndPoint endPoint; private final EndPoint endPoint;
@ -86,14 +87,11 @@ public class FrameFlusher extends IteratingCallback
return failure==null; return failure==null;
} }
public void onClose(Throwable t) public void onClose(Throwable cause)
{ {
if (t == null)
t = new ClosedChannelException();
synchronized (this) synchronized (this)
{ {
closedCause = t; closedCause = cause == null ? CLOSED_CHANNEL : cause;
} }
iterate(); iterate();

View File

@ -163,19 +163,15 @@ public class WebSocketConnection extends AbstractConnection implements Connectio
* Not related to WebSocket close handshake. * Not related to WebSocket close handshake.
*/ */
@Override @Override
public void onClose() public void onClose(Throwable cause)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("onClose() of physical connection"); LOG.debug("onClose() of physical connection");
Throwable t = new ClosedChannelException();
if (!channel.isClosed()) if (!channel.isClosed())
channel.onEof(); channel.onEof();
flusher.onClose(cause);
flusher.onClose(t); super.onClose(cause);
super.onClose();
} }