diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java index ca81bd4791f..75d9d7d42c5 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java @@ -210,6 +210,31 @@ public interface Callback extends Invocable }; } + /** + * Create a nested callback which always fails the nested callback on completion. + * @param callback The nested callback + * @param cause The cause to fail the nested callback, if the new callback is failed the reason + * will be added to this cause as a suppressed exception. + * @return a new callback. + */ + static Callback from(Callback callback, Throwable cause) + { + return new Callback() + { + @Override + public void succeeded() + { + callback.failed(cause); + } + + @Override + public void failed(Throwable x) + { + cause.addSuppressed(x); + callback.failed(cause); + } + }; + } class Completing implements Callback { diff --git a/jetty-websocket/jetty-websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java b/jetty-websocket/jetty-websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java index 78f5e77d98d..00ecfb1fede 100644 --- a/jetty-websocket/jetty-websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java +++ b/jetty-websocket/jetty-websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java @@ -135,7 +135,8 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli public CompletableFuture connect(Object websocket, URI toUri, UpgradeRequest request, UpgradeListener listener) throws IOException { JettyClientUpgradeRequest upgradeRequest = new JettyClientUpgradeRequest(this, coreClient, request, toUri, websocket); - upgradeRequest.addListener(listener); + if (listener != null) + upgradeRequest.addListener(listener); coreClient.connect(upgradeRequest); return upgradeRequest.getFutureSession(); } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java index ccb1136ee52..4859449f6be 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java @@ -66,8 +66,13 @@ public interface FrameHandler extends IncomingFrames /** * Async notification that Connection is being opened. *

- * FrameHandler can write during this call, but will not receive frames until - * the onOpen() completes. + * FrameHandler can write during this call, but can not receive frames until the callback is succeeded. + *

+ *

+ * If the FrameHandler succeeds the callback we transition to OPEN state and can now receive frames if + * not demanding, or can now call {@link CoreSession#demand(long)} to receive frames if demanding. + * If the FrameHandler fails the callback a close frame will be sent with {@link CloseStatus#SERVER_ERROR} and + *the connection will be closed.
*

* * @param coreSession the channel associated with this connection. @@ -81,9 +86,8 @@ public interface FrameHandler extends IncomingFrames * sequentially to satisfy all outstanding demand signaled by calls to * {@link CoreSession#demand(long)}. * Control and Data frames are passed to this method. - * Control frames that require a response (eg PING and CLOSE) may be responded to by the - * the handler, but if an appropriate response is not sent once the callback is succeeded, - * then a response will be generated and sent. + * Close frames may be responded to by the handler, but if an appropriate close response is not + * sent once the callback is succeeded, then a response close will be generated and sent. * * @param frame the raw frame * @param callback the callback to indicate success in processing frame (or failure) @@ -93,7 +97,8 @@ public interface FrameHandler extends IncomingFrames /** * An error has occurred or been detected in websocket-core and being reported to FrameHandler. * A call to onError will be followed by a call to {@link #onClosed(CloseStatus, Callback)} giving the close status - * derived from the error. + * derived from the error. This will not be called more than once, {@link #onClosed(CloseStatus, Callback)} + * will be called on the callback completion. * * @param cause the reason for the error * @param callback the callback to indicate success in processing (or failure) @@ -105,6 +110,7 @@ public interface FrameHandler extends IncomingFrames *

* The connection is now closed, no reading or writing is possible anymore. * Implementations of FrameHandler can cleanup their resources for this connection now. + * This method will be called only once. *

* * @param closeStatus the close status received from remote, or in the case of abnormal closure from local. diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java index 1fb3412c725..066fc6ce36f 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java @@ -395,9 +395,11 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio CloseStatus closeStatus = abnormalCloseStatusFor(cause); if (closeStatus.getCode() == CloseStatus.PROTOCOL) - close(closeStatus, NOOP); + close(closeStatus, callback); else if (channelState.onClosed(closeStatus)) closeConnection(cause, closeStatus, callback); + else + callback.failed(cause); } /** @@ -428,7 +430,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio if (LOG.isDebugEnabled()) LOG.debug("ConnectionState: Transition to CONNECTED"); - Callback openCallback = Callback.from(()-> + Callback openCallback = Callback.from(()-> { channelState.onOpen(); if (!demanding) @@ -450,6 +452,11 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio catch (Throwable t) { openCallback.failed(t); + + /* This is double handling of the exception but we need to do this because we have two separate + mechanisms for returning the CoreSession, onOpen and the CompletableFuture and both the onOpen callback + and the CompletableFuture require the exception. */ + throw new RuntimeException(t); } } @@ -481,9 +488,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio { assertValidIncoming(frame); } - catch (Throwable ex) + catch (Throwable t) { - callback.failed(ex); + callback.failed(t); return; } @@ -497,9 +504,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio { assertValidOutgoing(frame); } - catch (Throwable ex) + catch (Throwable t) { - callback.failed(ex); + callback.failed(t); return; } @@ -517,13 +524,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio Callback closeConnectionCallback = Callback.from( ()->closeConnection(cause, channelState.getCloseStatus(), callback), - x->closeConnection(cause, channelState.getCloseStatus(), Callback.from( - ()-> callback.failed(x), - x2-> - { - x.addSuppressed(x2); - callback.failed(x); - }))); + t->closeConnection(cause, channelState.getCloseStatus(), Callback.from(callback, t))); flusher.queue.offer(new FrameEntry(frame, closeConnectionCallback, false)); } @@ -534,24 +535,18 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio } flusher.iterate(); } - catch (Throwable ex) + catch (Throwable t) { if (frame.getOpCode() == OpCode.CLOSE) { CloseStatus closeStatus = CloseStatus.getCloseStatus(frame); if (closeStatus instanceof AbnormalCloseStatus && channelState.onClosed(closeStatus)) - closeConnection(null, closeStatus, Callback.from( - ()->callback.failed(ex), - x2-> - { - ex.addSuppressed(x2); - callback.failed(ex); - })); + closeConnection(AbnormalCloseStatus.getCause(closeStatus), closeStatus, Callback.from(callback, t)); else - callback.failed(ex); + callback.failed(t); } else - callback.failed(ex); + callback.failed(t); } }