WebSocketChannel bug fixes, cleanup and javadoc updates for FrameHandler

pass the cause from the AbnormalCloseStatus to closeConnection in
WSChannel

If onOpen throws we now throw a runtime exception in addition to
failing the callback, this is to fail the CompletableFuture in the
ClientUpgradeRequest.

Signed-off-by: lachan-roberts <lachlan@webtide.com>
This commit is contained in:
lachan-roberts 2019-03-06 14:15:04 +11:00
parent c63a578c29
commit ebbca84a87
3 changed files with 55 additions and 29 deletions

View File

@ -210,6 +210,31 @@ public interface Callback extends Invocable
};
}
/**
* Create a nested callback which always fails the nested callback on completion.
* @param callback The nested callback
* @param cause The cause to fail the nested callback, if the new callback is failed the reason
* will be added to this cause as a suppressed exception.
* @return a new callback.
*/
static Callback from(Callback callback, Throwable cause)
{
return new Callback()
{
@Override
public void succeeded()
{
callback.failed(cause);
}
@Override
public void failed(Throwable x)
{
cause.addSuppressed(x);
callback.failed(cause);
}
};
}
class Completing implements Callback
{

View File

@ -66,8 +66,13 @@ public interface FrameHandler extends IncomingFrames
/**
* Async notification that Connection is being opened.
* <p>
* FrameHandler can write during this call, but will not receive frames until
* the onOpen() completes.
* FrameHandler can write during this call, but can not receive frames until the callback is succeeded.
* </p>
* <p>
* If the FrameHandler succeeds the callback we transition to OPEN state and can now receive frames if
* not demanding, or can now call {@link CoreSession#demand(long)} to receive frames if demanding.
* If the FrameHandler fails the callback a close frame will be sent with {@link CloseStatus#SERVER_ERROR} and
*the connection will be closed. <br>
* </p>
*
* @param coreSession the channel associated with this connection.
@ -81,9 +86,8 @@ public interface FrameHandler extends IncomingFrames
* sequentially to satisfy all outstanding demand signaled by calls to
* {@link CoreSession#demand(long)}.
* Control and Data frames are passed to this method.
* Control frames that require a response (eg PING and CLOSE) may be responded to by the
* the handler, but if an appropriate response is not sent once the callback is succeeded,
* then a response will be generated and sent.
* Close frames may be responded to by the handler, but if an appropriate close response is not
* sent once the callback is succeeded, then a response close will be generated and sent.
*
* @param frame the raw frame
* @param callback the callback to indicate success in processing frame (or failure)
@ -93,7 +97,8 @@ public interface FrameHandler extends IncomingFrames
/**
* An error has occurred or been detected in websocket-core and being reported to FrameHandler.
* A call to onError will be followed by a call to {@link #onClosed(CloseStatus, Callback)} giving the close status
* derived from the error.
* derived from the error. This will not be called more than once, {@link #onClosed(CloseStatus, Callback)}
* will be called on the callback completion.
*
* @param cause the reason for the error
* @param callback the callback to indicate success in processing (or failure)
@ -105,6 +110,7 @@ public interface FrameHandler extends IncomingFrames
* <p>
* The connection is now closed, no reading or writing is possible anymore.
* Implementations of FrameHandler can cleanup their resources for this connection now.
* This method will be called only once.
* </p>
*
* @param closeStatus the close status received from remote, or in the case of abnormal closure from local.

View File

@ -395,9 +395,11 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
CloseStatus closeStatus = abnormalCloseStatusFor(cause);
if (closeStatus.getCode() == CloseStatus.PROTOCOL)
close(closeStatus, NOOP);
close(closeStatus, callback);
else if (channelState.onClosed(closeStatus))
closeConnection(cause, closeStatus, callback);
else
callback.failed(cause);
}
/**
@ -450,6 +452,11 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
catch (Throwable t)
{
openCallback.failed(t);
/* This is double handling of the exception but we need to do this because we have two separate
mechanisms for returning the CoreSession, onOpen and the CompletableFuture and both the onOpen callback
and the CompletableFuture require the exception. */
throw new RuntimeException(t);
}
}
@ -481,9 +488,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
{
assertValidIncoming(frame);
}
catch (Throwable ex)
catch (Throwable t)
{
callback.failed(ex);
callback.failed(t);
return;
}
@ -497,9 +504,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
{
assertValidOutgoing(frame);
}
catch (Throwable ex)
catch (Throwable t)
{
callback.failed(ex);
callback.failed(t);
return;
}
@ -517,13 +524,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
Callback closeConnectionCallback = Callback.from(
()->closeConnection(cause, channelState.getCloseStatus(), callback),
x->closeConnection(cause, channelState.getCloseStatus(), Callback.from(
()-> callback.failed(x),
x2->
{
x.addSuppressed(x2);
callback.failed(x);
})));
t->closeConnection(cause, channelState.getCloseStatus(), Callback.from(callback, t)));
flusher.queue.offer(new FrameEntry(frame, closeConnectionCallback, false));
}
@ -534,24 +535,18 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
}
flusher.iterate();
}
catch (Throwable ex)
catch (Throwable t)
{
if (frame.getOpCode() == OpCode.CLOSE)
{
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (closeStatus instanceof AbnormalCloseStatus && channelState.onClosed(closeStatus))
closeConnection(null, closeStatus, Callback.from(
()->callback.failed(ex),
x2->
{
ex.addSuppressed(x2);
callback.failed(ex);
}));
closeConnection(AbnormalCloseStatus.getCause(closeStatus), closeStatus, Callback.from(callback, t));
else
callback.failed(ex);
callback.failed(t);
}
else
callback.failed(ex);
callback.failed(t);
}
}