From 84d74ba1ded7b7b72bc20a08612728af2e3bb317 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 29 Jan 2019 10:33:37 +1100 Subject: [PATCH] Issue #3290 async onOpen, onClose and onError Changes after review: + removed Adaptor from FrameHandler, so all non test usages of FrameHandler now use async API. + Fixed sequencing of multiple async operation so callback is notified after completion (created more Callback.from utilities for this) + fixed import order Signed-off-by: Greg Wilkins --- .../org/eclipse/jetty/server/HttpChannel.java | 43 ++-- .../java/org/eclipse/jetty/util/Callback.java | 77 ++++++- .../client/JavaxWebSocketClientContainer.java | 39 +++- .../common/JavaxWebSocketFrameHandler.java | 214 ++++++++++-------- .../javax/common/JavaxWebSocketSession.java | 6 +- ...avaxWebSocketFrameHandler_OnCloseTest.java | 17 +- ...avaxWebSocketFrameHandler_OnErrorTest.java | 14 +- ...ameHandler_OnMessage_BinaryStreamTest.java | 19 +- ...cketFrameHandler_OnMessage_BinaryTest.java | 17 +- ...FrameHandler_OnMessage_TextStreamTest.java | 17 +- ...SocketFrameHandler_OnMessage_TextTest.java | 17 +- ...JavaxWebSocketFrameHandler_OnOpenTest.java | 12 +- .../websocket/javax/tests/NetworkFuzzer.java | 46 ++-- .../javax/tests/framehandlers/FrameEcho.java | 22 +- .../framehandlers/FrameHandlerTracker.java | 48 ++-- .../tests/client/MessageReceivingTest.java | 41 ++-- .../javax/tests/client/OnCloseTest.java | 14 +- .../client/SessionAddMessageHandlerTest.java | 23 +- ...FrameHandler_OnMessage_TextStreamTest.java | 4 +- .../common/JettyWebSocketFrameHandler.java | 211 +++++++++-------- .../JettyWebSocketFrameHandlerTest.java | 18 +- .../jetty/websocket/core/FrameHandler.java | 80 +------ .../jetty/websocket/core/MessageHandler.java | 92 ++++---- .../core/internal/WebSocketChannel.java | 70 +++--- .../core/AbstractTestFrameHandler.java | 6 +- .../websocket/core/MessageHandlerTest.java | 14 +- .../core/SynchronousFrameHandler.java | 87 +++++++ .../websocket/core/TestFrameHandler.java | 10 +- .../websocket/core/WebSocketCloseTest.java | 17 +- .../websocket/core/WebSocketOpenTest.java | 28 ++- .../core/chat/ChatWebSocketServer.java | 27 ++- 31 files changed, 768 insertions(+), 582 deletions(-) create mode 100644 jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/SynchronousFrameHandler.java diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index a2ee5e8eb33..b52b941c1ba 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -18,20 +18,6 @@ package org.eclipse.jetty.server; -import org.eclipse.jetty.http.*; -import org.eclipse.jetty.io.*; -import org.eclipse.jetty.server.HttpChannelState.Action; -import org.eclipse.jetty.server.handler.ContextHandler; -import org.eclipse.jetty.server.handler.ErrorHandler; -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.Scheduler; - -import javax.servlet.DispatcherType; -import javax.servlet.RequestDispatcher; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; @@ -46,6 +32,33 @@ import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; +import javax.servlet.DispatcherType; +import javax.servlet.RequestDispatcher; + +import org.eclipse.jetty.http.BadMessageException; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpGenerator; +import org.eclipse.jetty.http.HttpHeader; +import org.eclipse.jetty.http.HttpMethod; +import org.eclipse.jetty.http.HttpScheme; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.io.ChannelEndPoint; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.QuietException; +import org.eclipse.jetty.server.HttpChannelState.Action; +import org.eclipse.jetty.server.handler.ContextHandler; +import org.eclipse.jetty.server.handler.ErrorHandler; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.SharedBlockingCallback.Blocker; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.util.thread.Scheduler; + /** * HttpChannel represents a single endpoint for HTTP semantic processing. * The HttpChannel is both a HttpParser.RequestHandler, where it passively receives events from @@ -561,8 +574,6 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor */ protected void handleException(Throwable failure) { - LOG.warn(failure); - // Unwrap wrapping Jetty and Servlet exceptions. Throwable quiet = unwrap(failure, QuietException.class); Throwable no_stack = unwrap(failure, BadMessageException.class, IOException.class, TimeoutException.class); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java index 0f1740f05e6..ca81bd4791f 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Callback.java @@ -111,6 +111,12 @@ public interface Callback extends Invocable }; } + /** + * Create a callback from the passed success and failure + * @param success Called when the callback succeeds + * @param failure Called when the callback fails + * @return a new Callback + */ static Callback from(Runnable success, Consumer failure) { return new Callback() @@ -129,6 +135,10 @@ public interface Callback extends Invocable }; } + /** Creaste a callback that runs completed when it succeeds or fails + * @param completed The completion to run on success or failure + * @return a new callback + */ static Callback from(Runnable completed) { return new Completing() @@ -140,6 +150,67 @@ public interface Callback extends Invocable }; } + /** + * Create a nested callback that runs completed after + * completing the nested callback. + * @param callback The nested callback + * @param completed The completion to run after the nested callback is completed + * @return a new callback. + */ + static Callback from(Callback callback, Runnable completed) + { + return new Nested(callback) + { + public void completed() + { + completed.run(); + } + }; + } + + /** + * Create a nested callback that runs completed before + * completing the nested callback. + * @param callback The nested callback + * @param completed The completion to run before the nested callback is completed. Any exceptions thrown + * from completed will result in a callback failure. + * @return a new callback. + */ + static Callback from(Runnable completed, Callback callback) + { + return new Callback() + { + @Override + public void succeeded() + { + try + { + completed.run(); + callback.succeeded(); + } + catch(Throwable t) + { + callback.failed(t); + } + } + + @Override + public void failed(Throwable x) + { + try + { + completed.run(); + } + catch(Throwable t) + { + x.addSuppressed(t); + } + callback.failed(x); + } + }; + } + + class Completing implements Callback { @Override @@ -158,7 +229,11 @@ public interface Callback extends Invocable { } } - + + /** + * Nested Completing Callback that completes after + * completing the nested callback + */ class Nested extends Completing { private final Callback callback; diff --git a/jetty-websocket/javax-websocket-client/src/main/java/org/eclipse/jetty/websocket/javax/client/JavaxWebSocketClientContainer.java b/jetty-websocket/javax-websocket-client/src/main/java/org/eclipse/jetty/websocket/javax/client/JavaxWebSocketClientContainer.java index f538c5e4cef..e43a2fec36a 100644 --- a/jetty-websocket/javax-websocket-client/src/main/java/org/eclipse/jetty/websocket/javax/client/JavaxWebSocketClientContainer.java +++ b/jetty-websocket/javax-websocket-client/src/main/java/org/eclipse/jetty/websocket/javax/client/JavaxWebSocketClientContainer.java @@ -18,15 +18,6 @@ package org.eclipse.jetty.websocket.javax.client; -import org.eclipse.jetty.client.HttpClient; -import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.util.DecoratedObjectFactory; -import org.eclipse.jetty.util.annotation.ManagedObject; -import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry; -import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; -import org.eclipse.jetty.websocket.javax.common.*; - -import javax.websocket.*; import java.io.IOException; import java.net.URI; import java.util.Objects; @@ -34,8 +25,29 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import java.util.function.Supplier; +import javax.websocket.ClientEndpoint; +import javax.websocket.ClientEndpointConfig; +import javax.websocket.DeploymentException; +import javax.websocket.Endpoint; +import javax.websocket.EndpointConfig; +import javax.websocket.Extension; +import javax.websocket.Session; + +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.util.DecoratedObjectFactory; +import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.websocket.core.WebSocketExtensionRegistry; +import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; +import org.eclipse.jetty.websocket.javax.common.ConfiguredEndpoint; +import org.eclipse.jetty.websocket.javax.common.InvalidWebSocketException; +import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketContainer; +import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketExtensionConfig; +import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerFactory; + /** * Container for Client use of the javax.websocket API. *

@@ -161,12 +173,15 @@ public class JavaxWebSocketClientContainer extends JavaxWebSocketContainer imple try { Future sessionFuture = connect(upgradeRequest); - long timeout = getDefaultMaxSessionIdleTimeout(); + long timeout = coreClient.getHttpClient().getConnectTimeout(); if (timeout>0) - return sessionFuture.get(timeout, TimeUnit.MILLISECONDS); - + return sessionFuture.get(timeout+1000, TimeUnit.MILLISECONDS); return sessionFuture.get(); } + catch (TimeoutException e) + { + throw new IOException("Connection future not completed " + destURI, e); + } catch (Exception e) { throw new IOException("Unable to connect to " + destURI, e); diff --git a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java index 6bde0f123a0..066c0ab1292 100644 --- a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java +++ b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java @@ -18,26 +18,47 @@ package org.eclipse.jetty.websocket.javax.common; -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.websocket.core.*; -import org.eclipse.jetty.websocket.javax.common.decoders.AvailableDecoders; -import org.eclipse.jetty.websocket.javax.common.messages.*; -import org.eclipse.jetty.websocket.javax.common.util.InvokerUtils; - -import javax.websocket.MessageHandler; -import javax.websocket.*; import java.lang.invoke.MethodHandle; import java.lang.invoke.MethodHandles; import java.lang.invoke.MethodType; import java.nio.ByteBuffer; -import java.util.*; +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; -public class JavaxWebSocketFrameHandler implements FrameHandler.Adaptor +import javax.websocket.CloseReason; +import javax.websocket.Decoder; +import javax.websocket.EndpointConfig; +import javax.websocket.MessageHandler; +import javax.websocket.PongMessage; +import javax.websocket.Session; + +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.core.CloseStatus; +import org.eclipse.jetty.websocket.core.Frame; +import org.eclipse.jetty.websocket.core.FrameHandler; +import org.eclipse.jetty.websocket.core.OpCode; +import org.eclipse.jetty.websocket.core.ProtocolException; +import org.eclipse.jetty.websocket.core.WebSocketConstants; +import org.eclipse.jetty.websocket.core.WebSocketException; +import org.eclipse.jetty.websocket.javax.common.decoders.AvailableDecoders; +import org.eclipse.jetty.websocket.javax.common.messages.DecodedBinaryMessageSink; +import org.eclipse.jetty.websocket.javax.common.messages.DecodedBinaryStreamMessageSink; +import org.eclipse.jetty.websocket.javax.common.messages.DecodedTextMessageSink; +import org.eclipse.jetty.websocket.javax.common.messages.DecodedTextStreamMessageSink; +import org.eclipse.jetty.websocket.javax.common.messages.PartialByteArrayMessageSink; +import org.eclipse.jetty.websocket.javax.common.messages.PartialByteBufferMessageSink; +import org.eclipse.jetty.websocket.javax.common.messages.PartialStringMessageSink; +import org.eclipse.jetty.websocket.javax.common.util.InvokerUtils; + +public class JavaxWebSocketFrameHandler implements FrameHandler { private final Logger LOG; private final JavaxWebSocketContainer container; @@ -171,106 +192,57 @@ public class JavaxWebSocketFrameHandler implements FrameHandler.Adaptor } @Override - public void onClosed(CloseStatus closeStatus) + public void onOpen(CoreSession coreSession, Callback callback) { - if (closeHandle != null) - { - try - { - CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.getCloseCode(closeStatus.getCode()), closeStatus.getReason()); - closeHandle.invoke(closeReason); - } - catch (Throwable cause) - { - throw new WebSocketException(endpointInstance.getClass().getName() + " CLOSE method error: " + cause.getMessage(), cause); - } - } - - container.removeBean(session); - } - - @SuppressWarnings("Duplicates") - @Override - public void onError(Throwable cause) - { - futureSession.completeExceptionally(cause); - - if (errorHandle == null) - { - LOG.warn("Unhandled Error: " + endpointInstance, cause); - return; - } - try { - errorHandle.invoke(cause); - } - catch (Throwable t) - { - WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getName() + " ERROR method error: " + cause.getMessage(), t); - wsError.addSuppressed(cause); - throw wsError; - } - } + this.coreSession = coreSession; + session = new JavaxWebSocketSession(container, coreSession, this, upgradeRequest.getUserPrincipal(), id, endpointConfig); - @Override - public void onOpen(CoreSession coreSession) throws Exception - { - this.coreSession = coreSession; - session = new JavaxWebSocketSession(container, coreSession, this, upgradeRequest.getUserPrincipal(), id, endpointConfig); + openHandle = InvokerUtils.bindTo(openHandle, session, endpointConfig); + closeHandle = InvokerUtils.bindTo(closeHandle, session); + errorHandle = InvokerUtils.bindTo(errorHandle, session); - openHandle = InvokerUtils.bindTo(openHandle, session, endpointConfig); - closeHandle = InvokerUtils.bindTo(closeHandle, session); - errorHandle = InvokerUtils.bindTo(errorHandle, session); + JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualTextMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(textMetadata); + JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualBinaryMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(binaryMetadata); - JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualTextMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(textMetadata); - JavaxWebSocketFrameHandlerMetadata.MessageMetadata actualBinaryMetadata = JavaxWebSocketFrameHandlerMetadata.MessageMetadata.copyOf(binaryMetadata); + pongHandle = InvokerUtils.bindTo(pongHandle, session); - pongHandle = InvokerUtils.bindTo(pongHandle, session); - - if (actualTextMetadata != null) - { - actualTextMetadata.handle = InvokerUtils.bindTo(actualTextMetadata.handle, endpointInstance, endpointConfig, session); - actualTextMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualTextMetadata.handle, session); - textSink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualTextMetadata); - - textMetadata = actualTextMetadata; - } - - if (actualBinaryMetadata != null) - { - actualBinaryMetadata.handle = InvokerUtils.bindTo(actualBinaryMetadata.handle, endpointInstance, endpointConfig, session); - actualBinaryMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualBinaryMetadata.handle, session); - binarySink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualBinaryMetadata); - - binaryMetadata = actualBinaryMetadata; - } - - if (openHandle != null) - { - try + if (actualTextMetadata != null) { + actualTextMetadata.handle = InvokerUtils.bindTo(actualTextMetadata.handle, endpointInstance, endpointConfig, session); + actualTextMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualTextMetadata.handle, session); + textSink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualTextMetadata); + + textMetadata = actualTextMetadata; + } + + if (actualBinaryMetadata != null) + { + actualBinaryMetadata.handle = InvokerUtils.bindTo(actualBinaryMetadata.handle, endpointInstance, endpointConfig, session); + actualBinaryMetadata.handle = JavaxWebSocketFrameHandlerFactory.wrapNonVoidReturnType(actualBinaryMetadata.handle, session); + binarySink = JavaxWebSocketFrameHandlerFactory.createMessageSink(session, actualBinaryMetadata); + + binaryMetadata = actualBinaryMetadata; + } + + if (openHandle != null) openHandle.invoke(); - } - catch (Throwable cause) - { - Exception wse = new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause); - // TODO This feels like double handling of the exception? Review need for futureSession - futureSession.completeExceptionally(wse); - throw wse; - } + container.addBean(session, true); + futureSession.complete(session); + callback.succeeded(); } + catch (Throwable cause) + { + Exception wse = new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause); - container.addBean(session, true); - futureSession.complete(session); + // TODO This feels like double handling of the exception? Review need for futureSession + futureSession.completeExceptionally(wse); + callback.failed(wse); + } } - /** - * @see #onFrame(Frame,Callback) - */ - public final void onFrame(Frame frame) {} - @Override public void onFrame(Frame frame, Callback callback) { @@ -302,6 +274,50 @@ public class JavaxWebSocketFrameHandler implements FrameHandler.Adaptor dataType = OpCode.UNDEFINED; } + + @Override + public void onClosed(CloseStatus closeStatus, Callback callback) + { + try + { + if (closeHandle != null) + { + CloseReason closeReason = new CloseReason(CloseReason.CloseCodes.getCloseCode(closeStatus.getCode()), closeStatus.getReason()); + closeHandle.invoke(closeReason); + } + container.removeBean(session); + callback.succeeded(); + } + catch (Throwable cause) + { + callback.failed(new WebSocketException(endpointInstance.getClass().getName() + " CLOSE method error: " + cause.getMessage(), cause)); + } + } + + @Override + public void onError(Throwable cause, Callback callback) + { + try + { + futureSession.completeExceptionally(cause); + + if (errorHandle != null) + errorHandle.invoke(cause); + else + LOG.warn("Unhandled Error: " + endpointInstance, cause); + callback.succeeded(); + } + catch (Throwable t) + { + WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getName() + " ERROR method error: " + cause.getMessage(), t); + wsError.addSuppressed(cause); + callback.failed(wsError); + // TODO should futureSession be failed here? + } + } + + + public Set getMessageHandlers() { if (messageHandlerMap.isEmpty()) diff --git a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSession.java b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSession.java index d63666d2a26..85f0ea66b34 100644 --- a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSession.java +++ b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSession.java @@ -220,10 +220,10 @@ public class JavaxWebSocketSession extends AbstractLifeCycle implements javax.we { getBasicRemote().sendObject(obj); } - catch (Throwable cause) + catch (Exception cause) { - // TODO: need way to fail Channel. - frameHandler.onError(cause); + // TODO review this + throw new RuntimeException(cause); } } } diff --git a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnCloseTest.java b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnCloseTest.java index 2a32e029b66..8f87f4a18c3 100644 --- a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnCloseTest.java +++ b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnCloseTest.java @@ -18,6 +18,13 @@ package org.eclipse.jetty.websocket.javax.common; +import java.util.concurrent.TimeUnit; + +import javax.websocket.ClientEndpoint; +import javax.websocket.CloseReason; +import javax.websocket.OnClose; +import javax.websocket.Session; + import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.Frame; @@ -25,12 +32,6 @@ import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket; import org.hamcrest.Matcher; import org.junit.jupiter.api.Test; -import javax.websocket.ClientEndpoint; -import javax.websocket.CloseReason; -import javax.websocket.OnClose; -import javax.websocket.Session; -import java.util.concurrent.TimeUnit; - import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; @@ -44,12 +45,12 @@ public class JavaxWebSocketFrameHandler_OnCloseTest extends AbstractJavaxWebSock JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket); // These invocations are the same for all tests - localEndpoint.onOpen(channel); + localEndpoint.onOpen(channel, Callback.NOOP); CloseStatus status = new CloseStatus(CloseStatus.NORMAL, "Normal"); Frame closeFrame = status.toFrame(); localEndpoint.onFrame(closeFrame, Callback.from(() -> { - localEndpoint.onClosed(status); + localEndpoint.onClosed(status, Callback.NOOP); }, t -> { throw new RuntimeException(t); diff --git a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnErrorTest.java b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnErrorTest.java index c91bf81ec19..eaf62d31d26 100644 --- a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnErrorTest.java +++ b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnErrorTest.java @@ -18,14 +18,16 @@ package org.eclipse.jetty.websocket.javax.common; -import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket; -import org.hamcrest.Matcher; -import org.junit.jupiter.api.Test; +import java.util.concurrent.TimeUnit; import javax.websocket.ClientEndpoint; import javax.websocket.OnError; import javax.websocket.Session; -import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket; +import org.hamcrest.Matcher; +import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; @@ -40,8 +42,8 @@ public class JavaxWebSocketFrameHandler_OnErrorTest extends AbstractJavaxWebSock JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket); // These invocations are the same for all tests - localEndpoint.onOpen(channel); - localEndpoint.onError(new RuntimeException("From Testcase")); + localEndpoint.onOpen(channel, Callback.NOOP); + localEndpoint.onError(new RuntimeException("From Testcase"), Callback.NOOP); String event = socket.events.poll(1, TimeUnit.SECONDS); assertThat("Event", event, eventMatcher); } diff --git a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_BinaryStreamTest.java b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_BinaryStreamTest.java index 1c60f3f0499..6e53a2bbca1 100644 --- a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_BinaryStreamTest.java +++ b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_BinaryStreamTest.java @@ -18,6 +18,15 @@ package org.eclipse.jetty.websocket.javax.common; +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import javax.websocket.ClientEndpoint; +import javax.websocket.OnMessage; + import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.websocket.core.Frame; @@ -25,14 +34,6 @@ import org.eclipse.jetty.websocket.core.OpCode; import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket; import org.junit.jupiter.api.Test; -import javax.websocket.ClientEndpoint; -import javax.websocket.OnMessage; -import java.io.IOException; -import java.io.InputStream; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -44,7 +45,7 @@ public class JavaxWebSocketFrameHandler_OnMessage_BinaryStreamTest extends Abstr JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket); // This invocation is the same for all tests - localEndpoint.onOpen(channel); + localEndpoint.onOpen(channel, Callback.NOOP); func.apply(localEndpoint); diff --git a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_BinaryTest.java b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_BinaryTest.java index 225d21992b9..f50220a502b 100644 --- a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_BinaryTest.java +++ b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_BinaryTest.java @@ -18,6 +18,14 @@ package org.eclipse.jetty.websocket.javax.common; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +import javax.websocket.ClientEndpoint; +import javax.websocket.OnMessage; +import javax.websocket.Session; + import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.core.Frame; @@ -27,13 +35,6 @@ import org.eclipse.jetty.websocket.javax.common.util.InvalidSignatureException; import org.hamcrest.Matcher; import org.junit.jupiter.api.Test; -import javax.websocket.ClientEndpoint; -import javax.websocket.OnMessage; -import javax.websocket.Session; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.TimeUnit; - import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; @@ -47,7 +48,7 @@ public class JavaxWebSocketFrameHandler_OnMessage_BinaryTest extends AbstractJav JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket); // This invocation is the same for all tests - localEndpoint.onOpen(channel); + localEndpoint.onOpen(channel, Callback.NOOP); assertThat("Has Binary Metadata", localEndpoint.getBinaryMetadata(), notNullValue()); diff --git a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java index a036519ffe2..b689aa387ee 100644 --- a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java +++ b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java @@ -18,6 +18,14 @@ package org.eclipse.jetty.websocket.javax.common; +import java.io.IOException; +import java.io.Reader; +import java.util.concurrent.TimeUnit; +import java.util.function.Function; + +import javax.websocket.ClientEndpoint; +import javax.websocket.OnMessage; + import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.websocket.core.Frame; @@ -25,13 +33,6 @@ import org.eclipse.jetty.websocket.core.OpCode; import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket; import org.junit.jupiter.api.Test; -import javax.websocket.ClientEndpoint; -import javax.websocket.OnMessage; -import java.io.IOException; -import java.io.Reader; -import java.util.concurrent.TimeUnit; -import java.util.function.Function; - import static org.hamcrest.CoreMatchers.is; import static org.hamcrest.MatcherAssert.assertThat; @@ -43,7 +44,7 @@ public class JavaxWebSocketFrameHandler_OnMessage_TextStreamTest extends Abstrac JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket); // This invocation is the same for all tests - localEndpoint.onOpen(channel); + localEndpoint.onOpen(channel, Callback.NOOP); func.apply(localEndpoint); diff --git a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_TextTest.java b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_TextTest.java index 6e47087cd2a..8ceb54c2801 100644 --- a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_TextTest.java +++ b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnMessage_TextTest.java @@ -18,6 +18,14 @@ package org.eclipse.jetty.websocket.javax.common; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.TimeUnit; + +import javax.websocket.ClientEndpoint; +import javax.websocket.OnMessage; +import javax.websocket.Session; + import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.core.Frame; @@ -27,13 +35,6 @@ import org.eclipse.jetty.websocket.javax.common.util.InvalidSignatureException; import org.hamcrest.Matcher; import org.junit.jupiter.api.Test; -import javax.websocket.ClientEndpoint; -import javax.websocket.OnMessage; -import javax.websocket.Session; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.concurrent.TimeUnit; - import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; import static org.hamcrest.Matchers.containsString; @@ -46,7 +47,7 @@ public class JavaxWebSocketFrameHandler_OnMessage_TextTest extends AbstractJavax JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket); // This invocation is the same for all tests - localEndpoint.onOpen(channel); + localEndpoint.onOpen(channel, Callback.NOOP); ByteBuffer payload = BufferUtil.toBuffer(msg, StandardCharsets.UTF_8); localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload(payload).setFin(true), Callback.NOOP); diff --git a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnOpenTest.java b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnOpenTest.java index b031044dd84..22d0a7800db 100644 --- a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnOpenTest.java +++ b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler_OnOpenTest.java @@ -18,14 +18,16 @@ package org.eclipse.jetty.websocket.javax.common; -import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket; -import org.hamcrest.Matcher; -import org.junit.jupiter.api.Test; +import java.util.concurrent.TimeUnit; import javax.websocket.ClientEndpoint; import javax.websocket.OnOpen; import javax.websocket.Session; -import java.util.concurrent.TimeUnit; + +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.websocket.javax.common.sockets.TrackingSocket; +import org.hamcrest.Matcher; +import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.allOf; @@ -38,7 +40,7 @@ public class JavaxWebSocketFrameHandler_OnOpenTest extends AbstractJavaxWebSocke JavaxWebSocketFrameHandler localEndpoint = newJavaxFrameHandler(socket); // This invocation is the same for all tests - localEndpoint.onOpen(channel); + localEndpoint.onOpen(channel, Callback.NOOP); String event = socket.events.poll(1, TimeUnit.SECONDS); assertThat("Event", event, eventMatcher); } diff --git a/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/NetworkFuzzer.java b/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/NetworkFuzzer.java index 75c46a81439..5238b8882c2 100644 --- a/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/NetworkFuzzer.java +++ b/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/NetworkFuzzer.java @@ -18,6 +18,16 @@ package org.eclipse.jetty.websocket.javax.tests; +import java.io.IOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Map; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + import org.eclipse.jetty.client.HttpResponse; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.io.EndPoint; @@ -32,16 +42,6 @@ import org.eclipse.jetty.websocket.core.client.UpgradeRequest; import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; import org.eclipse.jetty.websocket.core.internal.Generator; -import java.io.IOException; -import java.net.URI; -import java.nio.ByteBuffer; -import java.util.List; -import java.util.Map; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.TimeUnit; - public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseable { private final LocalServer server; @@ -217,7 +217,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab } } - public static class FrameCapture implements FrameHandler.Adaptor + public static class FrameCapture implements FrameHandler { private final BlockingQueue receivedFrames = new LinkedBlockingQueue<>(); private final EndPoint endPoint; @@ -229,32 +229,34 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab this.endPoint = endPoint; } - @Override - public void onClosed(CloseStatus closeStatus) - { - } @Override - public void onError(Throwable cause) throws Exception + public void onOpen(CoreSession coreSession, Callback callback) { + this.session = coreSession; + callback.succeeded(); } @Override public void onFrame(Frame frame, Callback callback) { receivedFrames.offer(Frame.copy(frame)); - synchronized(this) - { - callback.succeeded(); - } + callback.succeeded(); } @Override - public void onOpen(CoreSession coreSession) throws Exception + public void onError(Throwable cause, Callback callback) { - this.session = coreSession; + callback.succeeded(); } + @Override + public void onClosed(CloseStatus closeStatus, Callback callback) + { + callback.succeeded(); + } + + public void writeRaw(ByteBuffer buffer) throws IOException { synchronized (this) diff --git a/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/framehandlers/FrameEcho.java b/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/framehandlers/FrameEcho.java index 706b4b398cb..c5bbc0738d0 100644 --- a/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/framehandlers/FrameEcho.java +++ b/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/framehandlers/FrameEcho.java @@ -25,16 +25,17 @@ import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.FrameHandler; -public class FrameEcho implements FrameHandler.Adaptor +public class FrameEcho implements FrameHandler { private Logger LOG = Log.getLogger(FrameEcho.class); private CoreSession coreSession; @Override - public void onOpen(CoreSession coreSession) throws Exception + public void onOpen(CoreSession coreSession, Callback callback) { this.coreSession = coreSession; + callback.succeeded(); } @Override @@ -47,15 +48,18 @@ public class FrameEcho implements FrameHandler.Adaptor } @Override - public void onClosed(CloseStatus closeStatus) - { - coreSession = null; - } - - @Override - public void onError(Throwable cause) throws Exception + public void onError(Throwable cause, Callback callback) { if (LOG.isDebugEnabled()) LOG.debug(this + " onError ", cause); + callback.succeeded(); } + + @Override + public void onClosed(CloseStatus closeStatus, Callback callback) + { + coreSession = null; + callback.succeeded(); + } + } diff --git a/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/framehandlers/FrameHandlerTracker.java b/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/framehandlers/FrameHandlerTracker.java index ecb117df497..4df016bbcaa 100644 --- a/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/framehandlers/FrameHandlerTracker.java +++ b/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/framehandlers/FrameHandlerTracker.java @@ -18,17 +18,17 @@ package org.eclipse.jetty.websocket.javax.tests.framehandlers; -import org.eclipse.jetty.util.BufferUtil; -import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.websocket.core.CloseStatus; -import org.eclipse.jetty.websocket.core.MessageHandler; - import java.nio.ByteBuffer; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicReference; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.websocket.core.CloseStatus; +import org.eclipse.jetty.websocket.core.MessageHandler; + public class FrameHandlerTracker extends MessageHandler { public CountDownLatch openLatch = new CountDownLatch(1); @@ -45,26 +45,9 @@ public class FrameHandlerTracker extends MessageHandler } @Override - public void onOpen(CoreSession coreSession) throws Exception + public void onOpen(CoreSession coreSession, Callback callback) { - super.onOpen(coreSession); - openLatch.countDown(); - } - - @Override - public void onClosed(CloseStatus closeStatus) throws Exception - { - super.onClosed(closeStatus); - - closeDetail.compareAndSet(null, closeStatus); - closeLatch.countDown(); - } - - @Override - public void onError(Throwable cause) throws Exception - { - super.onError(cause); - error.compareAndSet(null, cause); + super.onOpen(coreSession, Callback.from(callback,()->openLatch.countDown())); } @Override @@ -80,4 +63,21 @@ public class FrameHandlerTracker extends MessageHandler bufferQueue.offer(BufferUtil.copy(wholeMessage)); callback.succeeded(); } + + @Override + public void onError(Throwable cause, Callback callback) + { + super.onError(cause, Callback.from(callback, ()-> error.compareAndSet(null, cause))); + } + + @Override + public void onClosed(CloseStatus closeStatus, Callback callback) + { + super.onClosed(closeStatus, Callback.from(callback,()-> + { + closeDetail.compareAndSet(null, closeStatus); + closeLatch.countDown(); + })); + } + } diff --git a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/MessageReceivingTest.java b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/MessageReceivingTest.java index 5a71d957825..5b051743685 100644 --- a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/MessageReceivingTest.java +++ b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/MessageReceivingTest.java @@ -18,6 +18,23 @@ package org.eclipse.jetty.websocket.javax.tests.client; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import javax.websocket.ClientEndpointConfig; +import javax.websocket.ContainerProvider; +import javax.websocket.Endpoint; +import javax.websocket.EndpointConfig; +import javax.websocket.Session; +import javax.websocket.WebSocketContainer; + import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.component.LifeCycle; @@ -28,31 +45,15 @@ import org.eclipse.jetty.websocket.core.FrameHandler; import org.eclipse.jetty.websocket.core.MessageHandler; import org.eclipse.jetty.websocket.core.OpCode; import org.eclipse.jetty.websocket.core.server.Negotiation; +import org.eclipse.jetty.websocket.javax.common.util.TextUtil; import org.eclipse.jetty.websocket.javax.tests.CoreServer; import org.eclipse.jetty.websocket.javax.tests.DataUtils; -import org.eclipse.jetty.websocket.javax.common.util.TextUtil; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; -import javax.websocket.ClientEndpointConfig; -import javax.websocket.ContainerProvider; -import javax.websocket.Endpoint; -import javax.websocket.EndpointConfig; -import javax.websocket.Session; -import javax.websocket.WebSocketContainer; -import java.io.IOException; -import java.nio.ByteBuffer; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.List; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingDeque; -import java.util.concurrent.TimeUnit; - import static java.nio.charset.StandardCharsets.UTF_8; import static org.hamcrest.CoreMatchers.instanceOf; import static org.hamcrest.CoreMatchers.is; @@ -292,9 +293,10 @@ public class MessageReceivingTest } @Override - public void onError(Throwable cause) + public void onError(Throwable cause, Callback callback) { LOG.warn(cause); + callback.succeeded(); } } @@ -323,9 +325,10 @@ public class MessageReceivingTest } @Override - public void onError(Throwable cause) + public void onError(Throwable cause, Callback callback) { LOG.warn(cause); + callback.succeeded(); } } diff --git a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/OnCloseTest.java b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/OnCloseTest.java index 72e5fe952e1..a60540331e5 100644 --- a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/OnCloseTest.java +++ b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/OnCloseTest.java @@ -30,19 +30,19 @@ import javax.websocket.Session; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.FrameHandler; -import org.eclipse.jetty.websocket.javax.tests.WSEventTracker; -import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseReasonSocket; -import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseSocket; +import org.eclipse.jetty.websocket.javax.client.EmptyClientEndpointConfig; +import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer; import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler; import org.eclipse.jetty.websocket.javax.common.UpgradeRequest; import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter; import org.eclipse.jetty.websocket.javax.common.UpgradeResponse; import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter; -import org.eclipse.jetty.websocket.javax.client.EmptyClientEndpointConfig; -import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer; +import org.eclipse.jetty.websocket.javax.tests.WSEventTracker; import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseReasonSessionSocket; +import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseReasonSocket; import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseSessionReasonSocket; import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseSessionSocket; +import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseSocket; import org.hamcrest.Matchers; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; @@ -112,11 +112,11 @@ public class OnCloseTest CompletableFuture futureSession = new CompletableFuture<>(); JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(endpoint, request, response, futureSession); - frameHandler.onOpen(new FrameHandler.CoreSession.Empty()); + frameHandler.onOpen(new FrameHandler.CoreSession.Empty(), Callback.NOOP); // Execute onClose call frameHandler.onFrame(CloseStatus.toFrame(CloseStatus.NORMAL), Callback.NOOP); - frameHandler.onClosed(CloseStatus.NORMAL_STATUS); + frameHandler.onClosed(CloseStatus.NORMAL_STATUS, Callback.NOOP); // Test captured event BlockingQueue events = endpoint.events; diff --git a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/SessionAddMessageHandlerTest.java b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/SessionAddMessageHandlerTest.java index 979da223f60..c2344a5e270 100644 --- a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/SessionAddMessageHandlerTest.java +++ b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/SessionAddMessageHandlerTest.java @@ -33,12 +33,9 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.FrameHandler; import org.eclipse.jetty.websocket.core.OpCode; -import org.eclipse.jetty.websocket.javax.tests.MessageType; -import org.eclipse.jetty.websocket.javax.tests.SessionMatchers; -import org.eclipse.jetty.websocket.javax.tests.handlers.ByteArrayWholeHandler; -import org.eclipse.jetty.websocket.javax.tests.handlers.ByteBufferPartialHandler; -import org.eclipse.jetty.websocket.javax.tests.handlers.LongMessageHandler; -import org.eclipse.jetty.websocket.javax.tests.handlers.StringWholeHandler; +import org.eclipse.jetty.websocket.javax.client.EmptyClientEndpointConfig; +import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer; +import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientFrameHandlerFactory; import org.eclipse.jetty.websocket.javax.common.ConfiguredEndpoint; import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler; import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerFactory; @@ -47,20 +44,20 @@ import org.eclipse.jetty.websocket.javax.common.UpgradeRequest; import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter; import org.eclipse.jetty.websocket.javax.common.UpgradeResponse; import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter; -import org.eclipse.jetty.websocket.javax.client.EmptyClientEndpointConfig; -import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer; -import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientFrameHandlerFactory; +import org.eclipse.jetty.websocket.javax.tests.MessageType; +import org.eclipse.jetty.websocket.javax.tests.SessionMatchers; +import org.eclipse.jetty.websocket.javax.tests.handlers.ByteArrayWholeHandler; +import org.eclipse.jetty.websocket.javax.tests.handlers.ByteBufferPartialHandler; +import org.eclipse.jetty.websocket.javax.tests.handlers.LongMessageHandler; +import org.eclipse.jetty.websocket.javax.tests.handlers.StringWholeHandler; import org.hamcrest.Matchers; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.allOf; -import static org.hamcrest.Matchers.hasItem; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.hamcrest.Matchers.not; public class SessionAddMessageHandlerTest { @@ -88,7 +85,7 @@ public class SessionAddMessageHandlerTest JavaxWebSocketFrameHandlerFactory frameHandlerFactory = new JavaxWebSocketClientFrameHandlerFactory(container); CompletableFuture futureSession = new CompletableFuture<>(); frameHandler = frameHandlerFactory.newJavaxFrameHandler(ei, handshakeRequest, handshakeResponse, futureSession); - frameHandler.onOpen(new FrameHandler.CoreSession.Empty()); + frameHandler.onOpen(new FrameHandler.CoreSession.Empty(), Callback.NOOP); // Session session = frameHandler.getSession(); diff --git a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/server/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/server/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java index b590af6cafb..680f06f594f 100644 --- a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/server/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java +++ b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/server/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java @@ -35,12 +35,12 @@ import org.eclipse.jetty.util.IO; import org.eclipse.jetty.websocket.core.Frame; import org.eclipse.jetty.websocket.core.FrameHandler; import org.eclipse.jetty.websocket.core.OpCode; -import org.eclipse.jetty.websocket.javax.tests.WSEventTracker; import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler; import org.eclipse.jetty.websocket.javax.common.UpgradeRequest; import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter; import org.eclipse.jetty.websocket.javax.common.UpgradeResponse; import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter; +import org.eclipse.jetty.websocket.javax.tests.WSEventTracker; import org.junit.jupiter.api.Test; import static org.hamcrest.CoreMatchers.is; @@ -57,7 +57,7 @@ public class JavaxWebSocketFrameHandler_OnMessage_TextStreamTest extends Abstrac // Establish endpoint function JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(socket, request, response, futureSession); - frameHandler.onOpen(new FrameHandler.CoreSession.Empty()); + frameHandler.onOpen(new FrameHandler.CoreSession.Empty(), Callback.NOOP); func.accept(frameHandler); return socket; } diff --git a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java index 8c859a15bf2..91d8a1d2485 100644 --- a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java +++ b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java @@ -18,6 +18,11 @@ package org.eclipse.jetty.websocket.common; +import java.lang.invoke.MethodHandle; +import java.nio.ByteBuffer; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executor; + import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; @@ -26,14 +31,19 @@ import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.common.invoke.InvalidSignatureException; -import org.eclipse.jetty.websocket.core.*; +import org.eclipse.jetty.websocket.core.BadPayloadException; +import org.eclipse.jetty.websocket.core.CloseException; +import org.eclipse.jetty.websocket.core.CloseStatus; +import org.eclipse.jetty.websocket.core.Frame; +import org.eclipse.jetty.websocket.core.FrameHandler; +import org.eclipse.jetty.websocket.core.MessageTooLargeException; +import org.eclipse.jetty.websocket.core.OpCode; +import org.eclipse.jetty.websocket.core.ProtocolException; +import org.eclipse.jetty.websocket.core.UpgradeException; +import org.eclipse.jetty.websocket.core.WebSocketException; +import org.eclipse.jetty.websocket.core.WebSocketTimeoutException; -import java.lang.invoke.MethodHandle; -import java.nio.ByteBuffer; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.Executor; - -public class JettyWebSocketFrameHandler implements FrameHandler.Adaptor +public class JettyWebSocketFrameHandler implements FrameHandler { private final Logger log; private final Executor executor; @@ -104,63 +114,43 @@ public class JettyWebSocketFrameHandler implements FrameHandler.Adaptor } @Override - public void onClosed(CloseStatus closeStatus) + public void onOpen(CoreSession coreSession, Callback callback) { - } - - @SuppressWarnings("Duplicates") - @Override - public void onError(Throwable cause) - { - cause = convertCause(cause); - futureSession.completeExceptionally(cause); - - if (errorHandle == null) - { - log.warn("Unhandled Error: Endpoint " + endpointInstance.getClass().getName() + " : " + cause); - if (log.isDebugEnabled()) - log.debug("unhandled", cause); - return; - } - try { - errorHandle.invoke(cause); + customizer.customize(coreSession); + + session = new WebSocketSessionImpl(coreSession, this, upgradeRequest, upgradeResponse); + + frameHandle = JettyWebSocketFrameHandlerFactory.bindTo(frameHandle, session); + openHandle = JettyWebSocketFrameHandlerFactory.bindTo(openHandle, session); + closeHandle = JettyWebSocketFrameHandlerFactory.bindTo(closeHandle, session); + errorHandle = JettyWebSocketFrameHandlerFactory.bindTo(errorHandle, session); + textHandle = JettyWebSocketFrameHandlerFactory.bindTo(textHandle, session); + binaryHandle = JettyWebSocketFrameHandlerFactory.bindTo(binaryHandle, session); + pingHandle = JettyWebSocketFrameHandlerFactory.bindTo(pingHandle, session); + pongHandle = JettyWebSocketFrameHandlerFactory.bindTo(pongHandle, session); + + if (textHandle != null) + textSink = JettyWebSocketFrameHandlerFactory.createMessageSink(textHandle, textSinkClass, executor, coreSession.getMaxTextMessageSize()); + + if (binaryHandle != null) + binarySink = JettyWebSocketFrameHandlerFactory + .createMessageSink(binaryHandle, binarySinkClass, executor, coreSession.getMaxBinaryMessageSize()); + + if (openHandle != null) + openHandle.invoke(); + + futureSession.complete(session); + callback.succeeded(); } - catch (Throwable t) + catch (Throwable cause) { - WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getName() + " ERROR method error: " + cause.getMessage(), t); - wsError.addSuppressed(cause); - throw wsError; + // TODO should futureSession be failed here? + callback.failed(new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause)); } } - public static Throwable convertCause(Throwable cause) - { - if (cause instanceof MessageTooLargeException) - return new org.eclipse.jetty.websocket.api.MessageTooLargeException(cause.getMessage(), cause); - - if (cause instanceof ProtocolException) - return new org.eclipse.jetty.websocket.api.ProtocolException(cause.getMessage(), cause); - - if (cause instanceof BadPayloadException) - return new org.eclipse.jetty.websocket.api.BadPayloadException(cause.getMessage(), cause); - - if (cause instanceof CloseException) - return new org.eclipse.jetty.websocket.api.CloseException(((CloseException)cause).getStatusCode(), cause.getMessage(), cause); - - if (cause instanceof WebSocketTimeoutException) - return new org.eclipse.jetty.websocket.api.WebSocketTimeoutException(cause.getMessage(), cause); - - if (cause instanceof InvalidSignatureException) - return new org.eclipse.jetty.websocket.api.InvalidWebSocketException(cause.getMessage(), cause); - - if (cause instanceof UpgradeException) - return new org.eclipse.jetty.websocket.api.UpgradeException(((UpgradeException)cause).getRequestURI(), cause); - - return cause; - } - /** * @see #onFrame(Frame,Callback) */ @@ -185,61 +175,56 @@ public class JettyWebSocketFrameHandler implements FrameHandler.Adaptor switch (frame.getOpCode()) { case OpCode.CLOSE: - onClose(frame, callback); + onCloseFrame(frame, callback); break; case OpCode.PING: - onPing(frame, callback); + onPingFrame(frame, callback); break; case OpCode.PONG: - onPong(frame, callback); + onPongFrame(frame, callback); break; case OpCode.TEXT: - onText(frame, callback); + onTextFrame(frame, callback); break; case OpCode.BINARY: - onBinary(frame, callback); + onBinaryFrame(frame, callback); break; case OpCode.CONTINUATION: - onContinuation(frame, callback); + onContinuationFrame(frame, callback); break; } } @Override - public void onOpen(CoreSession coreSession) + public void onError(Throwable cause, Callback callback) { - customizer.customize(coreSession); - - session = new WebSocketSessionImpl(coreSession, this, upgradeRequest, upgradeResponse); - - frameHandle = JettyWebSocketFrameHandlerFactory.bindTo(frameHandle, session); - openHandle = JettyWebSocketFrameHandlerFactory.bindTo(openHandle, session); - closeHandle = JettyWebSocketFrameHandlerFactory.bindTo(closeHandle, session); - errorHandle = JettyWebSocketFrameHandlerFactory.bindTo(errorHandle, session); - textHandle = JettyWebSocketFrameHandlerFactory.bindTo(textHandle, session); - binaryHandle = JettyWebSocketFrameHandlerFactory.bindTo(binaryHandle, session); - pingHandle = JettyWebSocketFrameHandlerFactory.bindTo(pingHandle, session); - pongHandle = JettyWebSocketFrameHandlerFactory.bindTo(pongHandle, session); - - if (textHandle != null) - textSink = JettyWebSocketFrameHandlerFactory.createMessageSink(textHandle, textSinkClass, executor, coreSession.getMaxTextMessageSize()); - - if (binaryHandle != null) - binarySink = JettyWebSocketFrameHandlerFactory.createMessageSink(binaryHandle, binarySinkClass, executor, coreSession.getMaxBinaryMessageSize()); - - if (openHandle != null) + try { - try - { - openHandle.invoke(); - } - catch (Throwable cause) - { - throw new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause); - } - } + cause = convertCause(cause); + futureSession.completeExceptionally(cause); - futureSession.complete(session); + if (errorHandle != null) + errorHandle.invoke(cause); + else + { + log.warn("Unhandled Error: Endpoint " + endpointInstance.getClass().getName() + " : " + cause); + if (log.isDebugEnabled()) + log.debug("unhandled", cause); + } + callback.succeeded(); + } + catch (Throwable t) + { + WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getName() + " ERROR method error: " + cause.getMessage(), t); + wsError.addSuppressed(cause); + callback.failed(wsError); + } + } + + @Override + public void onClosed(CloseStatus closeStatus, Callback callback) + { + callback.succeeded(); } public String toString() @@ -259,7 +244,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler.Adaptor activeMessageSink = null; } - private void onBinary(Frame frame, Callback callback) + private void onBinaryFrame(Frame frame, Callback callback) { if (activeMessageSink == null) activeMessageSink = binarySink; @@ -267,7 +252,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler.Adaptor acceptMessage(frame, callback); } - private void onClose(Frame frame, Callback callback) + private void onCloseFrame(Frame frame, Callback callback) { if (closeHandle != null) { @@ -284,12 +269,12 @@ public class JettyWebSocketFrameHandler implements FrameHandler.Adaptor callback.succeeded(); } - private void onContinuation(Frame frame, Callback callback) + private void onContinuationFrame(Frame frame, Callback callback) { acceptMessage(frame, callback); } - private void onPing(Frame frame, Callback callback) + private void onPingFrame(Frame frame, Callback callback) { if (pingHandle != null) { @@ -317,7 +302,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler.Adaptor callback.succeeded(); } - private void onPong(Frame frame, Callback callback) + private void onPongFrame(Frame frame, Callback callback) { if (pongHandle != null) { @@ -337,11 +322,39 @@ public class JettyWebSocketFrameHandler implements FrameHandler.Adaptor callback.succeeded(); } - private void onText(Frame frame, Callback callback) + private void onTextFrame(Frame frame, Callback callback) { if (activeMessageSink == null) activeMessageSink = textSink; acceptMessage(frame, callback); } + + + static Throwable convertCause(Throwable cause) + { + if (cause instanceof MessageTooLargeException) + return new org.eclipse.jetty.websocket.api.MessageTooLargeException(cause.getMessage(), cause); + + if (cause instanceof ProtocolException) + return new org.eclipse.jetty.websocket.api.ProtocolException(cause.getMessage(), cause); + + if (cause instanceof BadPayloadException) + return new org.eclipse.jetty.websocket.api.BadPayloadException(cause.getMessage(), cause); + + if (cause instanceof CloseException) + return new org.eclipse.jetty.websocket.api.CloseException(((CloseException)cause).getStatusCode(), cause.getMessage(), cause); + + if (cause instanceof WebSocketTimeoutException) + return new org.eclipse.jetty.websocket.api.WebSocketTimeoutException(cause.getMessage(), cause); + + if (cause instanceof InvalidSignatureException) + return new org.eclipse.jetty.websocket.api.InvalidWebSocketException(cause.getMessage(), cause); + + if (cause instanceof UpgradeException) + return new org.eclipse.jetty.websocket.api.UpgradeException(((UpgradeException)cause).getRequestURI(), cause); + + return cause; + } + } diff --git a/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerTest.java b/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerTest.java index df5b3c23add..7176f014078 100644 --- a/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerTest.java +++ b/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerTest.java @@ -119,7 +119,7 @@ public class JettyWebSocketFrameHandlerTest JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket); // Trigger Events - localEndpoint.onOpen(channel); + localEndpoint.onOpen(channel, Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello?").setFin(true), Callback.NOOP); localEndpoint.onFrame(CloseStatus.toFrame(StatusCode.NORMAL, "Normal"), Callback.NOOP); @@ -163,7 +163,7 @@ public class JettyWebSocketFrameHandlerTest JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket); // Trigger Events - localEndpoint.onOpen(channel); + localEndpoint.onOpen(channel, Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello Text Stream").setFin(true), Callback.NOOP); localEndpoint.onFrame(CloseStatus.toFrame(StatusCode.NORMAL, "Normal"), Callback.NOOP); @@ -185,7 +185,7 @@ public class JettyWebSocketFrameHandlerTest JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket); // Trigger Events - localEndpoint.onOpen(channel); + localEndpoint.onOpen(channel, Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hel").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("lo ").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("Wor").setFin(false), Callback.NOOP); @@ -208,7 +208,7 @@ public class JettyWebSocketFrameHandlerTest JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket); // Trigger Events - localEndpoint.onOpen(channel); + localEndpoint.onOpen(channel, Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), Callback.NOOP); @@ -238,7 +238,7 @@ public class JettyWebSocketFrameHandlerTest JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket); // Trigger Events - localEndpoint.onOpen(channel); + localEndpoint.onOpen(channel, Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), Callback.NOOP); @@ -264,10 +264,10 @@ public class JettyWebSocketFrameHandlerTest JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket); // Trigger Events - localEndpoint.onOpen(channel); + localEndpoint.onOpen(channel, Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP); - localEndpoint.onError(new RuntimeException("Nothing to see here")); + localEndpoint.onError(new RuntimeException("Nothing to see here"), Callback.NOOP); // Validate Events socket.events.assertEvents( @@ -284,7 +284,7 @@ public class JettyWebSocketFrameHandlerTest JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket); // Trigger Events - localEndpoint.onOpen(channel); + localEndpoint.onOpen(channel, Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload("World").setFin(true), Callback.NOOP); @@ -314,7 +314,7 @@ public class JettyWebSocketFrameHandlerTest JettyWebSocketFrameHandler localEndpoint = newLocalFrameHandler(socket); // Trigger Events - localEndpoint.onOpen(channel); + localEndpoint.onOpen(channel, Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.TEXT).setPayload("Hello").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.CONTINUATION).setPayload(" ").setFin(false), Callback.NOOP); localEndpoint.onFrame(new Frame(OpCode.PING).setPayload("You there?"), Callback.NOOP); diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java index 19d1b19f5ec..bf0a1826355 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/FrameHandler.java @@ -18,11 +18,6 @@ package org.eclipse.jetty.websocket.core; -import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.websocket.core.client.UpgradeRequest; -import org.eclipse.jetty.websocket.core.server.Negotiation; - import java.net.InetSocketAddress; import java.net.SocketAddress; import java.net.URI; @@ -30,6 +25,11 @@ import java.time.Duration; import java.util.List; import java.util.Map; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.websocket.core.client.UpgradeRequest; +import org.eclipse.jetty.websocket.core.server.Negotiation; + /** * Interface for local WebSocket Endpoint Frame handling. * @@ -124,74 +124,6 @@ public interface FrameHandler extends IncomingFrames return false; } - - interface Adaptor extends FrameHandler - { - @Override - default void onOpen(CoreSession coreSession, Callback callback) - { - try - { - onOpen(coreSession); - callback.succeeded(); - } - catch(Throwable t) - { - callback.failed(t); - } - } - - default void onOpen(CoreSession coreSession) throws Exception {} - - @Override - default void onFrame(Frame frame, Callback callback) - { - try - { - onFrame(frame); - callback.succeeded(); - } - catch(Throwable t) - { - callback.failed(t); - } - } - - default void onFrame(Frame frame) throws Exception {} - - @Override - default void onClosed(CloseStatus closeStatus, Callback callback) - { - try - { - onClosed(closeStatus); - callback.succeeded(); - } - catch(Throwable t) - { - callback.failed(t); - } - } - default void onClosed(CloseStatus closeStatus) throws Exception {} - - @Override - default void onError(Throwable cause, Callback callback) - { - try - { - onError(cause); - callback.succeeded(); - } - catch(Throwable t) - { - callback.failed(t); - } - } - - default void onError(Throwable cause) throws Exception {} - } - - interface Configuration { @@ -568,7 +500,7 @@ public interface FrameHandler extends IncomingFrames @Override public Duration getIdleTimeout() { - return timeout==null ? Duration.ofSeconds(0) : timeout; + return timeout==null ? Duration.ZERO : timeout; } @Override diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/MessageHandler.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/MessageHandler.java index 566e93a3314..37828331948 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/MessageHandler.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/MessageHandler.java @@ -18,14 +18,18 @@ package org.eclipse.jetty.websocket.core; -import org.eclipse.jetty.util.*; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; - import java.io.IOException; import java.nio.ByteBuffer; import java.util.function.Consumer; +import org.eclipse.jetty.util.BufferUtil; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IteratingNestedCallback; +import org.eclipse.jetty.util.Utf8Appendable; +import org.eclipse.jetty.util.Utf8StringBuilder; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + /** * A utility implementation of FrameHandler that defragments * text frames into a String message before calling {@link #onText(String, Callback)}. @@ -33,7 +37,7 @@ import java.util.function.Consumer; * may extend {@link #isDemanding()} to return true and then explicityly control * demand with calls to {@link org.eclipse.jetty.websocket.core.FrameHandler.CoreSession#demand(long)} */ -public class MessageHandler implements FrameHandler.Adaptor +public class MessageHandler implements FrameHandler { public static MessageHandler from(Consumer onText, Consumer onBinary) @@ -124,17 +128,18 @@ public class MessageHandler implements FrameHandler.Adaptor this.maxBinaryMessageSize = maxBinaryMessageSize; } - @Override - public void onOpen(CoreSession coreSession) throws Exception - { - this.coreSession = coreSession; - } - public CoreSession getCoreSession() { return coreSession; } + @Override + public void onOpen(CoreSession coreSession, Callback callback) + { + this.coreSession = coreSession; + callback.succeeded(); + } + @Override public void onFrame(Frame frame, Callback callback) { @@ -226,7 +231,38 @@ public class MessageHandler implements FrameHandler.Adaptor } @Override - public final void onFrame(Frame frame){} + public void onError(Throwable cause, Callback callback) + { + if (LOG.isDebugEnabled()) + LOG.debug(this + " onError ", cause); + callback.succeeded(); + } + + @Override + public void onClosed(CloseStatus closeStatus, Callback callback) + { + if (LOG.isDebugEnabled()) + LOG.debug("{} onClosed {}", this, closeStatus); + if (utf8StringBuilder != null && utf8StringBuilder.length() > 0 && closeStatus.isNormal()) + LOG.warn("{} closed with partial message: {} chars", utf8StringBuilder.length()); + + if (binaryMessage != null) + { + if (BufferUtil.hasContent(binaryMessage)) + LOG.warn("{} closed with partial message: {} bytes", binaryMessage.remaining()); + + getCoreSession().getByteBufferPool().release(binaryMessage); + binaryMessage = null; + } + + if (utf8StringBuilder != null) + { + utf8StringBuilder.reset(); + utf8StringBuilder = null; + } + coreSession = null; + callback.succeeded(); + } private void onTextFrame(Frame frame, Callback callback) { @@ -422,36 +458,4 @@ public class MessageHandler implements FrameHandler.Adaptor } }.iterate(); } - - @Override - public void onClosed(CloseStatus closeStatus) throws Exception - { - if (LOG.isDebugEnabled()) - LOG.debug("{} onClosed {}", this, closeStatus); - if (utf8StringBuilder != null && utf8StringBuilder.length() > 0 && closeStatus.isNormal()) - LOG.warn("{} closed with partial message: {} chars", utf8StringBuilder.length()); - - if (binaryMessage != null) - { - if (BufferUtil.hasContent(binaryMessage)) - LOG.warn("{} closed with partial message: {} bytes", binaryMessage.remaining()); - - getCoreSession().getByteBufferPool().release(binaryMessage); - binaryMessage = null; - } - - if (utf8StringBuilder != null) - { - utf8StringBuilder.reset(); - utf8StringBuilder = null; - } - coreSession = null; - } - - @Override - public void onError(Throwable cause) throws Exception - { - if (LOG.isDebugEnabled()) - LOG.debug(this + " onError ", cause); - } } diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java index 840ad5a8c36..7db48c1d95f 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/internal/WebSocketChannel.java @@ -18,16 +18,6 @@ package org.eclipse.jetty.websocket.core.internal; -import org.eclipse.jetty.io.ByteBufferPool; -import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.IteratingCallback; -import org.eclipse.jetty.util.Utf8Appendable; -import org.eclipse.jetty.util.component.Dumpable; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.websocket.core.*; -import org.eclipse.jetty.websocket.core.internal.Parser.ParsedFrame; - import java.io.IOException; import java.net.SocketAddress; import java.net.SocketTimeoutException; @@ -40,6 +30,28 @@ import java.util.Queue; import java.util.concurrent.Executor; import java.util.concurrent.TimeoutException; +import org.eclipse.jetty.io.ByteBufferPool; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.IteratingCallback; +import org.eclipse.jetty.util.Utf8Appendable; +import org.eclipse.jetty.util.component.Dumpable; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; +import org.eclipse.jetty.websocket.core.Behavior; +import org.eclipse.jetty.websocket.core.CloseException; +import org.eclipse.jetty.websocket.core.CloseStatus; +import org.eclipse.jetty.websocket.core.Extension; +import org.eclipse.jetty.websocket.core.ExtensionConfig; +import org.eclipse.jetty.websocket.core.Frame; +import org.eclipse.jetty.websocket.core.FrameHandler; +import org.eclipse.jetty.websocket.core.IncomingFrames; +import org.eclipse.jetty.websocket.core.OpCode; +import org.eclipse.jetty.websocket.core.OutgoingFrames; +import org.eclipse.jetty.websocket.core.ProtocolException; +import org.eclipse.jetty.websocket.core.WebSocketConstants; +import org.eclipse.jetty.websocket.core.WebSocketTimeoutException; +import org.eclipse.jetty.websocket.core.internal.Parser.ParsedFrame; + import static org.eclipse.jetty.util.Callback.NOOP; /** @@ -469,9 +481,6 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio @Override public void sendFrame(Frame frame, Callback callback, boolean batch) { - if (LOG.isDebugEnabled()) - LOG.debug("sendFrame({}, {}, {})", frame, callback, batch); - try { assertValidOutgoing(frame); @@ -487,26 +496,29 @@ public class WebSocketChannel implements IncomingFrames, FrameHandler.CoreSessio synchronized(flusher) { boolean closeConnection = channelState.onOutgoingFrame(frame); + if (LOG.isDebugEnabled()) + LOG.debug("sendFrame({}, {}, {}) {}", frame, callback, batch, closeConnection); - if (frame.getOpCode() == OpCode.CLOSE) + if (closeConnection) { - if (LOG.isDebugEnabled()) - LOG.debug("close({}, {}, {})", CloseStatus.getCloseStatus(frame), callback, batch); + Throwable cause = AbnormalCloseStatus.getCause(CloseStatus.getCloseStatus(frame)); - if (closeConnection) - { - callback = new Callback.Nested(callback) - { - @Override - public void completed() - { - closeConnection(AbnormalCloseStatus.getCause(CloseStatus.getCloseStatus(frame)), channelState.getCloseStatus(),NOOP); - } - }; - } + Callback closeConnectionCallback = Callback.from( + ()->closeConnection(cause, channelState.getCloseStatus(), callback), + x->closeConnection(cause, channelState.getCloseStatus(), Callback.from( + ()-> callback.failed(x), + x2-> + { + x.addSuppressed(x2); + callback.failed(x); + }))); + + flusher.queue.offer(new FrameEntry(frame, closeConnectionCallback, false)); + } + else + { + flusher.queue.offer(new FrameEntry(frame, callback, batch)); } - - flusher.queue.offer(new FrameEntry(frame, callback, batch)); } flusher.iterate(); } diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/AbstractTestFrameHandler.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/AbstractTestFrameHandler.java index 0ff0a1fab50..b8d53db1e31 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/AbstractTestFrameHandler.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/AbstractTestFrameHandler.java @@ -18,14 +18,14 @@ package org.eclipse.jetty.websocket.core; +import java.nio.ByteBuffer; + import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import java.nio.ByteBuffer; - import static org.eclipse.jetty.websocket.core.OpCode.PONG; /** @@ -35,7 +35,7 @@ import static org.eclipse.jetty.websocket.core.OpCode.PONG; * NOTE: The introduction of WebSocket over HTTP/2 might change the behavior and implementation some. *

*/ -public class AbstractTestFrameHandler implements FrameHandler.Adaptor +public class AbstractTestFrameHandler implements SynchronousFrameHandler { private Logger LOG = Log.getLogger(AbstractTestFrameHandler.class); private byte partial = OpCode.UNDEFINED; diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/MessageHandlerTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/MessageHandlerTest.java index 50f7107e3a4..f7d5b3c86e8 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/MessageHandlerTest.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/MessageHandlerTest.java @@ -111,7 +111,7 @@ public class MessageHandlerTest } }; - handler.onOpen(session); + handler.onOpen(session, NOOP); } @Test @@ -350,7 +350,7 @@ public class MessageHandlerTest FutureCallback callback; handler.setMaxTextMessageSize(4); - handler.onOpen(session); + handler.onOpen(session, NOOP); callback = new FutureCallback(); handler.onFrame(new Frame(OpCode.TEXT, true, "Testing"), callback); @@ -369,7 +369,7 @@ public class MessageHandlerTest FutureCallback callback; handler.setMaxTextMessageSize(4); - handler.onOpen(session); + handler.onOpen(session, NOOP); callback = new FutureCallback(); handler.onFrame(new Frame(OpCode.TEXT, false, "123"), callback); @@ -570,7 +570,7 @@ public class MessageHandlerTest FutureCallback callback; handler.setMaxBinaryMessageSize(4); - handler.onOpen(session); + handler.onOpen(session, NOOP); callback = new FutureCallback(); handler.onFrame(new Frame(OpCode.BINARY, true, "Testing"), callback); @@ -589,7 +589,7 @@ public class MessageHandlerTest FutureCallback callback; handler.setMaxBinaryMessageSize(4); - handler.onOpen(session); + handler.onOpen(session, NOOP); callback = new FutureCallback(); handler.onFrame(new Frame(OpCode.BINARY, false, "123"), callback); @@ -653,7 +653,7 @@ public class MessageHandlerTest } }; - handler.onOpen(session); + handler.onOpen(session, NOOP); FutureCallback callback; @@ -681,7 +681,7 @@ public class MessageHandlerTest } }; - handler.onOpen(session); + handler.onOpen(session, NOOP); FutureCallback callback; diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/SynchronousFrameHandler.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/SynchronousFrameHandler.java new file mode 100644 index 00000000000..07d84952dd6 --- /dev/null +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/SynchronousFrameHandler.java @@ -0,0 +1,87 @@ +// +// ======================================================================== +// Copyright (c) 1995-2019 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.websocket.core; + +import org.eclipse.jetty.util.Callback; + +public interface SynchronousFrameHandler extends FrameHandler +{ + @Override + default void onOpen(CoreSession coreSession, Callback callback) + { + try + { + onOpen(coreSession); + callback.succeeded(); + } + catch(Throwable t) + { + callback.failed(t); + } + } + + default void onOpen(CoreSession coreSession) throws Exception {} + + @Override + default void onFrame(Frame frame, Callback callback) + { + try + { + onFrame(frame); + callback.succeeded(); + } + catch(Throwable t) + { + callback.failed(t); + } + } + + default void onFrame(Frame frame) throws Exception {} + + @Override + default void onClosed(CloseStatus closeStatus, Callback callback) + { + try + { + onClosed(closeStatus); + callback.succeeded(); + } + catch(Throwable t) + { + callback.failed(t); + } + } + default void onClosed(CloseStatus closeStatus) throws Exception {} + + @Override + default void onError(Throwable cause, Callback callback) + { + try + { + onError(cause); + callback.succeeded(); + } + catch(Throwable t) + { + callback.failed(t); + } + } + + default void onError(Throwable cause) throws Exception {} +} diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/TestFrameHandler.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/TestFrameHandler.java index 0f368b35619..5f644880238 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/TestFrameHandler.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/TestFrameHandler.java @@ -18,18 +18,18 @@ package org.eclipse.jetty.websocket.core; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; + import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; - -public class TestFrameHandler implements FrameHandler.Adaptor +public class TestFrameHandler implements SynchronousFrameHandler { - private static Logger LOG = Log.getLogger(TestFrameHandler.class); + private static Logger LOG = Log.getLogger(SynchronousFrameHandler.class); private CoreSession session; diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java index 3011cbe3147..ca72b60db6b 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketCloseTest.java @@ -18,6 +18,11 @@ package org.eclipse.jetty.websocket.core; +import java.net.Socket; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.NetworkConnector; import org.eclipse.jetty.server.Server; @@ -39,16 +44,14 @@ import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; -import java.net.Socket; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; - import static org.eclipse.jetty.util.Callback.NOOP; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.containsString; import static org.hamcrest.Matchers.is; -import static org.junit.jupiter.api.Assertions.*; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Tests of a core server with a fake client @@ -352,7 +355,7 @@ public class WebSocketCloseTest extends WebSocketTester assertThat(server.handler.closeStatus.getReason(), containsString("onReceiveFrame throws for binary frames")); } - static class TestFrameHandler implements FrameHandler.Adaptor + static class TestFrameHandler implements SynchronousFrameHandler { private CoreSession session; String state; diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketOpenTest.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketOpenTest.java index 5dbd77be63b..a5dea27d502 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketOpenTest.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/WebSocketOpenTest.java @@ -18,6 +18,12 @@ package org.eclipse.jetty.websocket.core; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Exchanger; +import java.util.concurrent.TimeUnit; +import java.util.function.BiFunction; + import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.NetworkConnector; import org.eclipse.jetty.server.Server; @@ -39,16 +45,14 @@ import org.eclipse.jetty.websocket.core.server.internal.RFC6455Handshaker; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Test; -import java.net.Socket; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Exchanger; -import java.util.concurrent.TimeUnit; -import java.util.function.BiFunction; - import static org.eclipse.jetty.util.Callback.NOOP; import static org.hamcrest.MatcherAssert.assertThat; -import static org.hamcrest.Matchers.*; -import static org.junit.jupiter.api.Assertions.*; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertThrows; +import static org.junit.jupiter.api.Assertions.assertTrue; /** * Tests of a core server with a fake client @@ -83,7 +87,7 @@ public class WebSocketOpenTest extends WebSocketTester setup((s,c)-> { assertThat(s.toString(),containsString("CONNECTED")); - TestFrameHandler.sendText(s,"Hello", c); + WebSocketOpenTest.TestFrameHandler.sendText(s,"Hello", c); s.demand(1); return null; }); @@ -170,7 +174,7 @@ public class WebSocketOpenTest extends WebSocketTester Thread.sleep(100); // Can send while onOpen is active - TestFrameHandler.sendText(session,"Hello", NOOP); + WebSocketOpenTest.TestFrameHandler.sendText(session,"Hello", NOOP); Parser.ParsedFrame frame = receiveFrame(client.getInputStream()); assertThat(frame.getPayloadAsUTF8(),is("Hello")); @@ -203,7 +207,7 @@ public class WebSocketOpenTest extends WebSocketTester - static class TestFrameHandler implements FrameHandler.Adaptor + static class TestFrameHandler implements SynchronousFrameHandler { private CoreSession session; private BiFunction onOpen; @@ -351,7 +355,7 @@ public class WebSocketOpenTest extends WebSocketTester public void sendText(String text) { LOG.info("sending {}...", text); - TestFrameHandler.sendText(handler.session, text); + WebSocketOpenTest.TestFrameHandler.sendText(handler.session, text); } public void close() diff --git a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/chat/ChatWebSocketServer.java b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/chat/ChatWebSocketServer.java index 9dacb1b4a0e..ad9f7fdc93d 100644 --- a/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/chat/ChatWebSocketServer.java +++ b/jetty-websocket/websocket-core/src/test/java/org/eclipse/jetty/websocket/core/chat/ChatWebSocketServer.java @@ -18,6 +18,15 @@ package org.eclipse.jetty.websocket.core.chat; +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + import org.eclipse.jetty.server.HttpConnectionFactory; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Server; @@ -34,14 +43,6 @@ import org.eclipse.jetty.websocket.core.server.Negotiation; import org.eclipse.jetty.websocket.core.server.WebSocketNegotiator; import org.eclipse.jetty.websocket.core.server.WebSocketUpgradeHandler; -import javax.servlet.ServletException; -import javax.servlet.http.HttpServletRequest; -import javax.servlet.http.HttpServletResponse; -import java.io.IOException; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - import static org.eclipse.jetty.util.Callback.NOOP; public class ChatWebSocketServer @@ -67,14 +68,12 @@ public class ChatWebSocketServer // + MUST return the FrameHandler or null or exception? return new MessageHandler() { - @Override - public void onOpen(CoreSession coreSession) throws Exception + public void onOpen(CoreSession coreSession, Callback callback) { LOG.debug("onOpen {}", coreSession); setMaxTextMessageSize(2 * 1024); - super.onOpen(coreSession); - members.add(this); + super.onOpen(coreSession, Callback.from(()->{members.add(this); callback.succeeded();},x->callback.failed(x))); } @Override @@ -92,10 +91,10 @@ public class ChatWebSocketServer } @Override - public void onClosed(CloseStatus closeStatus) throws Exception + public void onClosed(CloseStatus closeStatus, Callback callback) { LOG.debug("onClosed {}", closeStatus); - super.onClosed(closeStatus); + super.onClosed(closeStatus, Callback.from(()->members.remove(this),callback)); members.remove(this); } };