diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java index ca81bd4791f..75d9d7d42c5 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java @@ -210,6 +210,31 @@ public interface Callback extends Invocable }; } + /** + * Create a nested callback which always fails the nested callback on completion. + * @param callback The nested callback + * @param cause The cause to fail the nested callback, if the new callback is failed the reason + * will be added to this cause as a suppressed exception. + * @return a new callback. + */ + static Callback from(Callback callback, Throwable cause) + { + return new Callback() + { + @Override + public void succeeded() + { + callback.failed(cause); + } + + @Override + public void failed(Throwable x) + { + cause.addSuppressed(x); + callback.failed(cause); + } + }; + } class Completing implements Callback { diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java index ccb1136ee52..4859449f6be 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java @@ -66,8 +66,13 @@ public interface FrameHandler extends IncomingFrames /** * Async notification that Connection is being opened. *
- * FrameHandler can write during this call, but will not receive frames until - * the onOpen() completes. + * FrameHandler can write during this call, but can not receive frames until the callback is succeeded. + *
+ *
+ * If the FrameHandler succeeds the callback we transition to OPEN state and can now receive frames if
+ * not demanding, or can now call {@link CoreSession#demand(long)} to receive frames if demanding.
+ * If the FrameHandler fails the callback a close frame will be sent with {@link CloseStatus#SERVER_ERROR} and
+ *the connection will be closed.
*
* The connection is now closed, no reading or writing is possible anymore. * Implementations of FrameHandler can cleanup their resources for this connection now. + * This method will be called only once. *
* * @param closeStatus the close status received from remote, or in the case of abnormal closure from local. diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java index 1fb3412c725..066fc6ce36f 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java @@ -395,9 +395,11 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio CloseStatus closeStatus = abnormalCloseStatusFor(cause); if (closeStatus.getCode() == CloseStatus.PROTOCOL) - close(closeStatus, NOOP); + close(closeStatus, callback); else if (channelState.onClosed(closeStatus)) closeConnection(cause, closeStatus, callback); + else + callback.failed(cause); } /** @@ -428,7 +430,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio if (LOG.isDebugEnabled()) LOG.debug("ConnectionState: Transition to CONNECTED"); - Callback openCallback = Callback.from(()-> + Callback openCallback = Callback.from(()-> { channelState.onOpen(); if (!demanding) @@ -450,6 +452,11 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio catch (Throwable t) { openCallback.failed(t); + + /* This is double handling of the exception but we need to do this because we have two separate + mechanisms for returning the CoreSession, onOpen and the CompletableFuture and both the onOpen callback + and the CompletableFuture require the exception. */ + throw new RuntimeException(t); } } @@ -481,9 +488,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio { assertValidIncoming(frame); } - catch (Throwable ex) + catch (Throwable t) { - callback.failed(ex); + callback.failed(t); return; } @@ -497,9 +504,9 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio { assertValidOutgoing(frame); } - catch (Throwable ex) + catch (Throwable t) { - callback.failed(ex); + callback.failed(t); return; } @@ -517,13 +524,7 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio Callback closeConnectionCallback = Callback.from( ()->closeConnection(cause, channelState.getCloseStatus(), callback), - x->closeConnection(cause, channelState.getCloseStatus(), Callback.from( - ()-> callback.failed(x), - x2-> - { - x.addSuppressed(x2); - callback.failed(x); - }))); + t->closeConnection(cause, channelState.getCloseStatus(), Callback.from(callback, t))); flusher.queue.offer(new FrameEntry(frame, closeConnectionCallback, false)); } @@ -534,24 +535,18 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio } flusher.iterate(); } - catch (Throwable ex) + catch (Throwable t) { if (frame.getOpCode() == OpCode.CLOSE) { CloseStatus closeStatus = CloseStatus.getCloseStatus(frame); if (closeStatus instanceof AbnormalCloseStatus && channelState.onClosed(closeStatus)) - closeConnection(null, closeStatus, Callback.from( - ()->callback.failed(ex), - x2-> - { - ex.addSuppressed(x2); - callback.failed(ex); - })); + closeConnection(AbnormalCloseStatus.getCause(closeStatus), closeStatus, Callback.from(callback, t)); else - callback.failed(ex); + callback.failed(t); } else - callback.failed(ex); + callback.failed(t); } }