Merge pull request #3432 from lachlan-roberts/jetty-10.0.x-websocket-fixes-from-proxy

WebSocketChannel bug fixes, cleanup and javadoc updates for FrameHandler
This commit is contained in:
Greg Wilkins 2019-03-06 17:59:58 +11:00 committed by GitHub
commit 4bb6a920c0
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 57 additions and 30 deletions

View File

@ -210,6 +210,31 @@ public interface Callback extends Invocable
}; };
} }
/**
* Create a nested callback which always fails the nested callback on completion.
* @param callback The nested callback
* @param cause The cause to fail the nested callback, if the new callback is failed the reason
* will be added to this cause as a suppressed exception.
* @return a new callback.
*/
static Callback from(Callback callback, Throwable cause)
{
return new Callback()
{
@Override
public void succeeded()
{
callback.failed(cause);
}
@Override
public void failed(Throwable x)
{
cause.addSuppressed(x);
callback.failed(cause);
}
};
}
class Completing implements Callback class Completing implements Callback
{ {

View File

@ -135,7 +135,8 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
public CompletableFuture<Session> connect(Object websocket, URI toUri, UpgradeRequest request, UpgradeListener listener) throws IOException public CompletableFuture<Session> connect(Object websocket, URI toUri, UpgradeRequest request, UpgradeListener listener) throws IOException
{ {
JettyClientUpgradeRequest upgradeRequest = new JettyClientUpgradeRequest(this, coreClient, request, toUri, websocket); JettyClientUpgradeRequest upgradeRequest = new JettyClientUpgradeRequest(this, coreClient, request, toUri, websocket);
upgradeRequest.addListener(listener); if (listener != null)
upgradeRequest.addListener(listener);
coreClient.connect(upgradeRequest); coreClient.connect(upgradeRequest);
return upgradeRequest.getFutureSession(); return upgradeRequest.getFutureSession();
} }

View File

@ -66,8 +66,13 @@ public interface FrameHandler extends IncomingFrames
/** /**
* Async notification that Connection is being opened. * Async notification that Connection is being opened.
* <p> * <p>
* FrameHandler can write during this call, but will not receive frames until * FrameHandler can write during this call, but can not receive frames until the callback is succeeded.
* the onOpen() completes. * </p>
* <p>
* If the FrameHandler succeeds the callback we transition to OPEN state and can now receive frames if
* not demanding, or can now call {@link CoreSession#demand(long)} to receive frames if demanding.
* If the FrameHandler fails the callback a close frame will be sent with {@link CloseStatus#SERVER_ERROR} and
*the connection will be closed. <br>
* </p> * </p>
* *
* @param coreSession the channel associated with this connection. * @param coreSession the channel associated with this connection.
@ -81,9 +86,8 @@ public interface FrameHandler extends IncomingFrames
* sequentially to satisfy all outstanding demand signaled by calls to * sequentially to satisfy all outstanding demand signaled by calls to
* {@link CoreSession#demand(long)}. * {@link CoreSession#demand(long)}.
* Control and Data frames are passed to this method. * Control and Data frames are passed to this method.
* Control frames that require a response (eg PING and CLOSE) may be responded to by the * Close frames may be responded to by the handler, but if an appropriate close response is not
* the handler, but if an appropriate response is not sent once the callback is succeeded, * sent once the callback is succeeded, then a response close will be generated and sent.
* then a response will be generated and sent.
* *
* @param frame the raw frame * @param frame the raw frame
* @param callback the callback to indicate success in processing frame (or failure) * @param callback the callback to indicate success in processing frame (or failure)
@ -93,7 +97,8 @@ public interface FrameHandler extends IncomingFrames
/** /**
* An error has occurred or been detected in websocket-core and being reported to FrameHandler. * An error has occurred or been detected in websocket-core and being reported to FrameHandler.
* A call to onError will be followed by a call to {@link #onClosed(CloseStatus, Callback)} giving the close status * A call to onError will be followed by a call to {@link #onClosed(CloseStatus, Callback)} giving the close status
* derived from the error. * derived from the error. This will not be called more than once, {@link #onClosed(CloseStatus, Callback)}
* will be called on the callback completion.
* *
* @param cause the reason for the error * @param cause the reason for the error
* @param callback the callback to indicate success in processing (or failure) * @param callback the callback to indicate success in processing (or failure)
@ -105,6 +110,7 @@ public interface FrameHandler extends IncomingFrames
* <p> * <p>
* The connection is now closed, no reading or writing is possible anymore. * The connection is now closed, no reading or writing is possible anymore.
* Implementations of FrameHandler can cleanup their resources for this connection now. * Implementations of FrameHandler can cleanup their resources for this connection now.
* This method will be called only once.
* </p> * </p>
* *
* @param closeStatus the close status received from remote, or in the case of abnormal closure from local. * @param closeStatus the close status received from remote, or in the case of abnormal closure from local.

View File

@ -395,9 +395,11 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
CloseStatus closeStatus = abnormalCloseStatusFor(cause); CloseStatus closeStatus = abnormalCloseStatusFor(cause);
if (closeStatus.getCode() == CloseStatus.PROTOCOL) if (closeStatus.getCode() == CloseStatus.PROTOCOL)
close(closeStatus, NOOP); close(closeStatus, callback);
else if (channelState.onClosed(closeStatus)) else if (channelState.onClosed(closeStatus))
closeConnection(cause, closeStatus, callback); closeConnection(cause, closeStatus, callback);
else
callback.failed(cause);
} }
/** /**
@ -428,7 +430,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("ConnectionState: Transition to CONNECTED"); LOG.debug("ConnectionState: Transition to CONNECTED");
Callback openCallback = Callback.from(()-> Callback openCallback = Callback.from(()->
{ {
channelState.onOpen(); channelState.onOpen();
if (!demanding) if (!demanding)
@ -450,6 +452,11 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
catch (Throwable t) catch (Throwable t)
{ {
openCallback.failed(t); openCallback.failed(t);
/* This is double handling of the exception but we need to do this because we have two separate
mechanisms for returning the CoreSession, onOpen and the CompletableFuture and both the onOpen callback
and the CompletableFuture require the exception. */
throw new RuntimeException(t);
} }
} }
@ -481,9 +488,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
{ {
assertValidIncoming(frame); assertValidIncoming(frame);
} }
catch (Throwable ex) catch (Throwable t)
{ {
callback.failed(ex); callback.failed(t);
return; return;
} }
@ -497,9 +504,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
{ {
assertValidOutgoing(frame); assertValidOutgoing(frame);
} }
catch (Throwable ex) catch (Throwable t)
{ {
callback.failed(ex); callback.failed(t);
return; return;
} }
@ -517,13 +524,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
Callback closeConnectionCallback = Callback.from( Callback closeConnectionCallback = Callback.from(
()->closeConnection(cause, channelState.getCloseStatus(), callback), ()->closeConnection(cause, channelState.getCloseStatus(), callback),
x->closeConnection(cause, channelState.getCloseStatus(), Callback.from( t->closeConnection(cause, channelState.getCloseStatus(), Callback.from(callback, t)));
()-> callback.failed(x),
x2->
{
x.addSuppressed(x2);
callback.failed(x);
})));
flusher.queue.offer(new FrameEntry(frame, closeConnectionCallback, false)); flusher.queue.offer(new FrameEntry(frame, closeConnectionCallback, false));
} }
@ -534,24 +535,18 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio
} }
flusher.iterate(); flusher.iterate();
} }
catch (Throwable ex) catch (Throwable t)
{ {
if (frame.getOpCode() == OpCode.CLOSE) if (frame.getOpCode() == OpCode.CLOSE)
{ {
CloseStatus closeStatus = CloseStatus.getCloseStatus(frame); CloseStatus closeStatus = CloseStatus.getCloseStatus(frame);
if (closeStatus instanceof AbnormalCloseStatus && channelState.onClosed(closeStatus)) if (closeStatus instanceof AbnormalCloseStatus && channelState.onClosed(closeStatus))
closeConnection(null, closeStatus, Callback.from( closeConnection(AbnormalCloseStatus.getCause(closeStatus), closeStatus, Callback.from(callback, t));
()->callback.failed(ex),
x2->
{
ex.addSuppressed(x2);
callback.failed(ex);
}));
else else
callback.failed(ex); callback.failed(t);
} }
else else
callback.failed(ex); callback.failed(t);
} }
} }