diff --git a/jetty-websocket/javax-websocket-client/src/main/java/org/eclipse/jetty/websocket/javax/client/JavaxClientUpgradeRequest.java b/jetty-websocket/javax-websocket-client/src/main/java/org/eclipse/jetty/websocket/javax/client/JavaxClientUpgradeRequest.java index b3ee91af5dc..2494f81b862 100644 --- a/jetty-websocket/javax-websocket-client/src/main/java/org/eclipse/jetty/websocket/javax/client/JavaxClientUpgradeRequest.java +++ b/jetty-websocket/javax-websocket-client/src/main/java/org/eclipse/jetty/websocket/javax/client/JavaxClientUpgradeRequest.java @@ -19,52 +19,41 @@ package org.eclipse.jetty.websocket.javax.client; import java.net.URI; -import java.util.concurrent.CompletableFuture; - -import javax.websocket.Session; import org.eclipse.jetty.client.HttpResponse; +import org.eclipse.jetty.client.http.HttpConnectionOverHTTP; import org.eclipse.jetty.websocket.core.FrameHandler; import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler; import org.eclipse.jetty.websocket.javax.common.UpgradeRequest; -import org.eclipse.jetty.websocket.javax.common.UpgradeResponse; public class JavaxClientUpgradeRequest extends ClientUpgradeRequest { private final JavaxWebSocketClientContainer containerContext; - private final Object websocketPojo; - private final CompletableFuture futureSession; + private final JavaxWebSocketFrameHandler frameHandler; + public JavaxClientUpgradeRequest(JavaxWebSocketClientContainer clientContainer, WebSocketCoreClient coreClient, URI requestURI, Object websocketPojo) { super(coreClient, requestURI); this.containerContext = clientContainer; - this.websocketPojo = websocketPojo; - this.futureSession = new CompletableFuture<>(); - } - @Override - protected void handleException(Throwable failure) - { - super.handleException(failure); - futureSession.completeExceptionally(failure); - } - - @Override - public FrameHandler getFrameHandler(WebSocketCoreClient coreClient, HttpResponse response) - { UpgradeRequest upgradeRequest = new DelegatedJavaxClientUpgradeRequest(this); - UpgradeResponse upgradeResponse = new DelegatedJavaxClientUpgradeResponse(response); + frameHandler = containerContext.newFrameHandler(websocketPojo, upgradeRequest); + } - JavaxWebSocketFrameHandler frameHandler = containerContext.newFrameHandler(websocketPojo, upgradeRequest, upgradeResponse, futureSession); + @Override + public void upgrade(HttpResponse response, HttpConnectionOverHTTP httpConnection) + { + frameHandler.setUpgradeRequest(new DelegatedJavaxClientUpgradeRequest(this)); + frameHandler.setUpgradeResponse(new DelegatedJavaxClientUpgradeResponse(response)); + super.upgrade(response, httpConnection); + } + @Override + public FrameHandler getFrameHandler() + { return frameHandler; } - - public CompletableFuture getFutureSession() - { - return futureSession; - } } diff --git a/jetty-websocket/javax-websocket-client/src/main/java/org/eclipse/jetty/websocket/javax/client/JavaxWebSocketClientContainer.java b/jetty-websocket/javax-websocket-client/src/main/java/org/eclipse/jetty/websocket/javax/client/JavaxWebSocketClientContainer.java index e051d723a28..3dc118a4907 100644 --- a/jetty-websocket/javax-websocket-client/src/main/java/org/eclipse/jetty/websocket/javax/client/JavaxWebSocketClientContainer.java +++ b/jetty-websocket/javax-websocket-client/src/main/java/org/eclipse/jetty/websocket/javax/client/JavaxWebSocketClientContainer.java @@ -45,6 +45,7 @@ import org.eclipse.jetty.websocket.javax.common.ConfiguredEndpoint; import org.eclipse.jetty.websocket.javax.common.InvalidWebSocketException; import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketContainer; import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketExtensionConfig; +import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler; import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerFactory; /** @@ -106,17 +107,29 @@ public class JavaxWebSocketClientContainer extends JavaxWebSocketContainer imple private CompletableFuture connect(JavaxClientUpgradeRequest upgradeRequest) { upgradeRequest.setConfiguration(defaultCustomizer); - CompletableFuture fut = upgradeRequest.getFutureSession(); + CompletableFuture futureSession = new CompletableFuture<>(); + try { - getWebSocketCoreClient().connect(upgradeRequest); - return fut; + WebSocketCoreClient coreClient = getWebSocketCoreClient(); + coreClient.connect(upgradeRequest).whenComplete((coreSession, error)-> + { + if (error != null) + { + futureSession.completeExceptionally(error); + return; + } + + JavaxWebSocketFrameHandler frameHandler = (JavaxWebSocketFrameHandler)upgradeRequest.getFrameHandler(); + futureSession.complete(frameHandler.getSession()); + }); } catch (Exception e) { - fut.completeExceptionally(e); - return fut; + futureSession.completeExceptionally(e); } + + return futureSession; } private Session connect(ConfiguredEndpoint configuredEndpoint, URI destURI) throws IOException @@ -141,7 +154,7 @@ public class JavaxWebSocketClientContainer extends JavaxWebSocketContainer imple upgradeRequest.setSubProtocols(clientEndpointConfig.getPreferredSubprotocols()); } - long timeout = coreClient.getHttpClient().getConnectTimeout(); + long timeout = getWebSocketCoreClient().getHttpClient().getConnectTimeout(); try { Future sessionFuture = connect(upgradeRequest); diff --git a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketContainer.java b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketContainer.java index bc6654f5e87..e7bd960a24d 100644 --- a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketContainer.java +++ b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketContainer.java @@ -23,12 +23,10 @@ import java.util.ArrayList; import java.util.HashSet; import java.util.List; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import java.util.function.Consumer; import javax.websocket.Extension; -import javax.websocket.Session; import javax.websocket.WebSocketContainer; import org.eclipse.jetty.io.ByteBufferPool; @@ -156,10 +154,9 @@ public abstract class JavaxWebSocketContainer extends ContainerLifeCycle impleme return sessionTracker.getSessions(); } - public JavaxWebSocketFrameHandler newFrameHandler(Object websocketPojo, UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse, - CompletableFuture futureSession) + public JavaxWebSocketFrameHandler newFrameHandler(Object websocketPojo, UpgradeRequest upgradeRequest) { - return getFrameHandlerFactory().newJavaxWebSocketFrameHandler(websocketPojo, upgradeRequest, upgradeResponse, futureSession); + return getFrameHandlerFactory().newJavaxWebSocketFrameHandler(websocketPojo, upgradeRequest); } /** diff --git a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java index 4aeb62293fa..70907557192 100644 --- a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java +++ b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandler.java @@ -27,7 +27,6 @@ import java.util.HashMap; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import javax.websocket.CloseReason; @@ -35,7 +34,6 @@ import javax.websocket.Decoder; import javax.websocket.EndpointConfig; import javax.websocket.MessageHandler; import javax.websocket.PongMessage; -import javax.websocket.Session; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; @@ -96,17 +94,11 @@ public class JavaxWebSocketFrameHandler implements FrameHandler private JavaxWebSocketFrameHandlerMetadata.MessageMetadata binaryMetadata; // TODO: need pingHandle ? private MethodHandle pongHandle; - /** - * Immutable HandshakeRequest available via Session - */ - private final UpgradeRequest upgradeRequest; - /** - * Immutable javax.websocket.HandshakeResponse available via Session - */ - private final UpgradeResponse upgradeResponse; - private final String id; + + private UpgradeRequest upgradeRequest; + private UpgradeResponse upgradeResponse; + private final EndpointConfig endpointConfig; - private final CompletableFuture futureSession; private MessageSink textSink; private MessageSink binarySink; private MessageSink activeMessageSink; @@ -118,14 +110,11 @@ public class JavaxWebSocketFrameHandler implements FrameHandler public JavaxWebSocketFrameHandler(JavaxWebSocketContainer container, Object endpointInstance, - UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse, MethodHandle openHandle, MethodHandle closeHandle, MethodHandle errorHandle, JavaxWebSocketFrameHandlerMetadata.MessageMetadata textMetadata, JavaxWebSocketFrameHandlerMetadata.MessageMetadata binaryMetadata, MethodHandle pongHandle, - String id, - EndpointConfig endpointConfig, - CompletableFuture futureSession) + EndpointConfig endpointConfig) { this.LOG = Log.getLogger(endpointInstance.getClass()); @@ -137,8 +126,6 @@ public class JavaxWebSocketFrameHandler implements FrameHandler throw oops; } this.endpointInstance = endpointInstance; - this.upgradeRequest = upgradeRequest; - this.upgradeResponse = upgradeResponse; this.openHandle = openHandle; this.closeHandle = closeHandle; @@ -147,9 +134,7 @@ public class JavaxWebSocketFrameHandler implements FrameHandler this.binaryMetadata = binaryMetadata; this.pongHandle = pongHandle; - this.id = id; this.endpointConfig = endpointConfig; - this.futureSession = futureSession; this.messageHandlerMap = new HashMap<>(); } @@ -174,7 +159,7 @@ public class JavaxWebSocketFrameHandler implements FrameHandler try { this.coreSession = coreSession; - session = new JavaxWebSocketSession(container, coreSession, this, upgradeRequest.getUserPrincipal(), id, endpointConfig); + session = new JavaxWebSocketSession(container, coreSession, this, endpointConfig); openHandle = InvokerUtils.bindTo(openHandle, session, endpointConfig); closeHandle = InvokerUtils.bindTo(closeHandle, session); @@ -214,13 +199,11 @@ public class JavaxWebSocketFrameHandler implements FrameHandler container.notifySessionListeners((listener) -> listener.onJavaxWebSocketSessionOpened(session)); callback.succeeded(); - futureSession.complete(session); } catch (Throwable cause) { Exception wse = new WebSocketException(endpointInstance.getClass().getSimpleName() + " OPEN method error: " + cause.getMessage(), cause); callback.failed(wse); - futureSession.completeExceptionally(wse); } } @@ -281,8 +264,6 @@ public class JavaxWebSocketFrameHandler implements FrameHandler { try { - futureSession.completeExceptionally(cause); - if (errorHandle != null) errorHandle.invoke(cause); else @@ -294,7 +275,6 @@ public class JavaxWebSocketFrameHandler implements FrameHandler WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getSimpleName() + " ERROR method error: " + cause.getMessage(), t); wsError.addSuppressed(cause); callback.failed(wsError); - // TODO should futureSession be failed here? } } @@ -630,4 +610,24 @@ public class JavaxWebSocketFrameHandler implements FrameHandler throw new ProtocolException("Unable to process continuation during dataType " + dataType); } } + + public void setUpgradeRequest(UpgradeRequest upgradeRequest) + { + this.upgradeRequest = upgradeRequest; + } + + public void setUpgradeResponse(UpgradeResponse upgradeResponse) + { + this.upgradeResponse = upgradeResponse; + } + + public UpgradeRequest getUpgradeRequest() + { + return upgradeRequest; + } + + public UpgradeResponse getUpgradeResponse() + { + return upgradeResponse; + } } diff --git a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandlerFactory.java b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandlerFactory.java index cf37b660e2f..dc8e878ce38 100644 --- a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandlerFactory.java +++ b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketFrameHandlerFactory.java @@ -31,7 +31,6 @@ import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import javax.websocket.CloseReason; @@ -108,9 +107,7 @@ public abstract class JavaxWebSocketFrameHandlerFactory public abstract JavaxWebSocketFrameHandlerMetadata createMetadata(Class endpointClass, EndpointConfig endpointConfig); - public JavaxWebSocketFrameHandler newJavaxWebSocketFrameHandler(Object endpointInstance, UpgradeRequest upgradeRequest, - UpgradeResponse upgradeResponse, - CompletableFuture futureSession) + public JavaxWebSocketFrameHandler newJavaxWebSocketFrameHandler(Object endpointInstance, UpgradeRequest upgradeRequest) { Object endpoint; EndpointConfig config; @@ -162,22 +159,13 @@ public abstract class JavaxWebSocketFrameHandlerFactory errorHandle = InvokerUtils.bindTo(errorHandle, endpoint); pongHandle = InvokerUtils.bindTo(pongHandle, endpoint); - CompletableFuture future = futureSession; - if (future == null) - future = new CompletableFuture<>(); - - String id = upgradeRequest.toString(); - JavaxWebSocketFrameHandler frameHandler = new JavaxWebSocketFrameHandler( container, endpoint, - upgradeRequest, upgradeResponse, openHandle, closeHandle, errorHandle, textMetadata, binaryMetadata, pongHandle, - id, - config, - future); + config); return frameHandler; } diff --git a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSession.java b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSession.java index 5de50d041c3..313eebf4f3a 100644 --- a/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSession.java +++ b/jetty-websocket/javax-websocket-common/src/main/java/org/eclipse/jetty/websocket/javax/common/JavaxWebSocketSession.java @@ -61,10 +61,8 @@ public class JavaxWebSocketSession extends AbstractLifeCycle implements javax.we protected final SharedBlockingCallback blocking = new SharedBlockingCallback(); private final JavaxWebSocketContainer container; private final FrameHandler.CoreSession coreSession; - private final Principal principal; private final JavaxWebSocketFrameHandler frameHandler; private final EndpointConfig config; - private final String id; private final AvailableDecoders availableDecoders; private final AvailableEncoders availableEncoders; private final Map pathParameters; @@ -77,15 +75,11 @@ public class JavaxWebSocketSession extends AbstractLifeCycle implements javax.we public JavaxWebSocketSession(JavaxWebSocketContainer container, FrameHandler.CoreSession coreSession, JavaxWebSocketFrameHandler frameHandler, - Principal upgradeRequestPrincipal, - String id, EndpointConfig endpointConfig) { this.container = container; this.coreSession = coreSession; this.frameHandler = frameHandler; - this.principal = upgradeRequestPrincipal; - this.id = id; this.config = endpointConfig == null?new BasicEndpointConfig():endpointConfig; @@ -139,7 +133,6 @@ public class JavaxWebSocketSession extends AbstractLifeCycle implements javax.we } frameHandler.addMessageHandler(this, clazz, handler); - } /** @@ -308,7 +301,7 @@ public class JavaxWebSocketSession extends AbstractLifeCycle implements javax.we @Override public String getId() { - return this.id; + return this.frameHandler.getUpgradeRequest().toString(); } /** @@ -516,7 +509,7 @@ public class JavaxWebSocketSession extends AbstractLifeCycle implements javax.we @Override public Principal getUserPrincipal() { - return this.principal; + return this.frameHandler.getUpgradeRequest().getUserPrincipal(); } /** diff --git a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/AbstractJavaxWebSocketFrameHandlerTest.java b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/AbstractJavaxWebSocketFrameHandlerTest.java index 423fc860765..4bb07f4a547 100644 --- a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/AbstractJavaxWebSocketFrameHandlerTest.java +++ b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/AbstractJavaxWebSocketFrameHandlerTest.java @@ -20,7 +20,6 @@ package org.eclipse.jetty.websocket.javax.common; import java.util.HashMap; import java.util.Map; -import java.util.concurrent.CompletableFuture; import javax.websocket.EndpointConfig; @@ -67,10 +66,8 @@ public abstract class AbstractJavaxWebSocketFrameHandlerTest BasicEndpointConfig config = new BasicEndpointConfig(); ConfiguredEndpoint endpoint = new ConfiguredEndpoint(websocket, config); UpgradeRequest upgradeRequest = new UpgradeRequestAdapter(); - UpgradeResponse upgradeResponse = new UpgradeResponseAdapter(); - JavaxWebSocketFrameHandler localEndpoint = factory.newJavaxWebSocketFrameHandler(endpoint, - upgradeRequest, upgradeResponse, new CompletableFuture<>()); + JavaxWebSocketFrameHandler localEndpoint = factory.newJavaxWebSocketFrameHandler(endpoint, upgradeRequest); return localEndpoint; } diff --git a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/AbstractSessionTest.java b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/AbstractSessionTest.java index d707234938e..b7dc6d895ad 100644 --- a/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/AbstractSessionTest.java +++ b/jetty-websocket/javax-websocket-common/src/test/java/org/eclipse/jetty/websocket/javax/common/AbstractSessionTest.java @@ -38,18 +38,9 @@ public abstract class AbstractSessionTest container.start(); Object websocketPojo = new DummyEndpoint(); UpgradeRequest upgradeRequest = new UpgradeRequestAdapter(); - UpgradeResponse upgradeResponse = new UpgradeResponseAdapter(); - JavaxWebSocketFrameHandler frameHandler = - container.newFrameHandler(websocketPojo, upgradeRequest, upgradeResponse, null); + JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(websocketPojo, upgradeRequest); FrameHandler.CoreSession coreSession = new FrameHandler.CoreSession.Empty(); - String id = "dummy"; - EndpointConfig endpointConfig = null; - session = new JavaxWebSocketSession(container, - coreSession, - frameHandler, - null, - id, - endpointConfig); + session = new JavaxWebSocketSession(container, coreSession, frameHandler, null); container.addManaged(session); } diff --git a/jetty-websocket/javax-websocket-server/src/main/java/org/eclipse/jetty/websocket/javax/server/JavaxWebSocketServerFrameHandlerFactory.java b/jetty-websocket/javax-websocket-server/src/main/java/org/eclipse/jetty/websocket/javax/server/JavaxWebSocketServerFrameHandlerFactory.java index 12d7746ae20..8a5bdee8907 100644 --- a/jetty-websocket/javax-websocket-server/src/main/java/org/eclipse/jetty/websocket/javax/server/JavaxWebSocketServerFrameHandlerFactory.java +++ b/jetty-websocket/javax-websocket-server/src/main/java/org/eclipse/jetty/websocket/javax/server/JavaxWebSocketServerFrameHandlerFactory.java @@ -18,11 +18,8 @@ package org.eclipse.jetty.websocket.javax.server; -import java.util.concurrent.CompletableFuture; - import javax.websocket.Endpoint; import javax.websocket.EndpointConfig; -import javax.websocket.Session; import javax.websocket.server.ServerEndpoint; import org.eclipse.jetty.http.pathmap.UriTemplatePathSpec; @@ -32,7 +29,6 @@ import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerFactor import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerMetadata; import org.eclipse.jetty.websocket.javax.server.internal.DelegatedJavaxServletUpgradeRequest; import org.eclipse.jetty.websocket.javax.server.internal.PathParamIdentifier; -import org.eclipse.jetty.websocket.javax.server.internal.UpgradeResponseAdapter; import org.eclipse.jetty.websocket.servlet.FrameHandlerFactory; import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; @@ -68,7 +64,6 @@ public class JavaxWebSocketServerFrameHandlerFactory extends JavaxWebSocketFrame @Override public FrameHandler newFrameHandler(Object websocketPojo, ServletUpgradeRequest upgradeRequest, ServletUpgradeResponse upgradeResponse) { - CompletableFuture completableFuture = new CompletableFuture<>(); - return newJavaxWebSocketFrameHandler(websocketPojo, new DelegatedJavaxServletUpgradeRequest(upgradeRequest), new UpgradeResponseAdapter(upgradeResponse), completableFuture); + return newJavaxWebSocketFrameHandler(websocketPojo, new DelegatedJavaxServletUpgradeRequest(upgradeRequest)); } } diff --git a/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/NetworkFuzzer.java b/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/NetworkFuzzer.java index b816be3edf5..e57c86d9bd5 100644 --- a/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/NetworkFuzzer.java +++ b/jetty-websocket/javax-websocket-tests/src/main/java/org/eclipse/jetty/websocket/javax/tests/NetworkFuzzer.java @@ -25,10 +25,10 @@ import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.TimeUnit; -import org.eclipse.jetty.client.HttpResponse; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.BufferUtil; @@ -42,6 +42,8 @@ import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest; import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; import org.eclipse.jetty.websocket.core.internal.Generator; +import static org.junit.jupiter.api.Assertions.assertTrue; + public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseable { private final LocalServer server; @@ -66,8 +68,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab super(); this.server = server; this.client = new WebSocketCoreClient(); - CompletableFuture futureOnCapture = new CompletableFuture<>(); - this.upgradeRequest = new RawUpgradeRequest(client, wsURI, futureOnCapture); + this.upgradeRequest = new RawUpgradeRequest(client, wsURI); if (requestHeaders != null) { HttpFields fields = this.upgradeRequest.getHeaders(); @@ -81,7 +82,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab this.generator = new UnitGenerator(Behavior.CLIENT); CompletableFuture futureHandler = this.client.connect(upgradeRequest); - CompletableFuture futureCapture = futureHandler.thenCombine(futureOnCapture, (session, capture) -> capture); + CompletableFuture futureCapture = futureHandler.thenCombine(upgradeRequest.getFuture(), (session, capture) -> capture); this.frameCapture = futureCapture.get(10, TimeUnit.SECONDS); } @@ -186,27 +187,31 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab public static class RawUpgradeRequest extends ClientUpgradeRequest { + private final FrameCapture frameCapture = new FrameCapture(); private final CompletableFuture futureCapture; - private EndPoint endPoint; - public RawUpgradeRequest(WebSocketCoreClient webSocketClient, URI requestURI, CompletableFuture futureCapture) + public RawUpgradeRequest(WebSocketCoreClient webSocketClient, URI requestURI) { super(webSocketClient, requestURI); - this.futureCapture = futureCapture; + this.futureCapture = new CompletableFuture<>(); + } + + public CompletableFuture getFuture() + { + return futureCapture; } @Override - public FrameHandler getFrameHandler(WebSocketCoreClient coreClient, HttpResponse response) + public FrameHandler getFrameHandler() { - FrameCapture frameCapture = new FrameCapture(this.endPoint); - futureCapture.complete(frameCapture); return frameCapture; } @Override protected void customize(EndPoint endp) { - this.endPoint = endp; + frameCapture.setEndPoint(endp); + futureCapture.complete(frameCapture); } @Override @@ -220,20 +225,21 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab public static class FrameCapture implements FrameHandler { private final BlockingQueue receivedFrames = new LinkedBlockingQueue<>(); - private final EndPoint endPoint; + private EndPoint endPoint; + private CountDownLatch openLatch = new CountDownLatch(1); private final SharedBlockingCallback blockingCallback = new SharedBlockingCallback(); private CoreSession coreSession; - public FrameCapture(EndPoint endPoint) + public void setEndPoint(EndPoint endpoint) { - this.endPoint = endPoint; + this.endPoint = endpoint; } - @Override public void onOpen(CoreSession coreSession, Callback callback) { this.coreSession = coreSession; + this.openLatch.countDown(); callback.succeeded(); } @@ -256,14 +262,22 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab callback.succeeded(); } - public void writeRaw(ByteBuffer buffer) throws IOException { + try + { + assertTrue(openLatch.await(1, TimeUnit.SECONDS)); + } + catch (InterruptedException e) + { + throw new IOException(e); + } + synchronized (this) { try (SharedBlockingCallback.Blocker blocker = blockingCallback.acquire()) { - this.endPoint.write(blocker, buffer); + endPoint.write(blocker, buffer); } } } diff --git a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/AbstractClientSessionTest.java b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/AbstractClientSessionTest.java index 2199ba781d6..ac8074a647a 100644 --- a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/AbstractClientSessionTest.java +++ b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/AbstractClientSessionTest.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.javax.tests.client; -import javax.websocket.EndpointConfig; - import org.eclipse.jetty.websocket.core.FrameHandler; import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer; import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketContainer; @@ -27,8 +25,6 @@ import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler; import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketSession; import org.eclipse.jetty.websocket.javax.common.UpgradeRequest; import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter; -import org.eclipse.jetty.websocket.javax.common.UpgradeResponse; -import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter; import org.eclipse.jetty.websocket.javax.tests.DummyEndpoint; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; @@ -45,18 +41,9 @@ public abstract class AbstractClientSessionTest container.start(); Object websocketPojo = new DummyEndpoint(); UpgradeRequest upgradeRequest = new UpgradeRequestAdapter(); - UpgradeResponse upgradeResponse = new UpgradeResponseAdapter(); - JavaxWebSocketFrameHandler frameHandler = - container.newFrameHandler(websocketPojo, upgradeRequest, upgradeResponse, null); + JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(websocketPojo, upgradeRequest); FrameHandler.CoreSession coreSession = new FrameHandler.CoreSession.Empty(); - String id = "dummy"; - EndpointConfig endpointConfig = null; - session = new JavaxWebSocketSession(container, - coreSession, - frameHandler, - null, - id, - endpointConfig); + session = new JavaxWebSocketSession(container, coreSession, frameHandler, null); container.addManaged(session); } diff --git a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/OnCloseTest.java b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/OnCloseTest.java index d177458320f..15454c9319b 100644 --- a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/OnCloseTest.java +++ b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/OnCloseTest.java @@ -20,11 +20,10 @@ package org.eclipse.jetty.websocket.javax.tests.client; import java.util.List; import java.util.concurrent.BlockingQueue; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.stream.Stream; + import javax.websocket.ClientEndpointConfig; -import javax.websocket.Session; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.websocket.core.CloseStatus; @@ -34,8 +33,6 @@ import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer; import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler; import org.eclipse.jetty.websocket.javax.common.UpgradeRequest; import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter; -import org.eclipse.jetty.websocket.javax.common.UpgradeResponse; -import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter; import org.eclipse.jetty.websocket.javax.tests.WSEventTracker; import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseReasonSessionSocket; import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseReasonSocket; @@ -107,10 +104,7 @@ public class OnCloseTest container.start(); UpgradeRequest request = new UpgradeRequestAdapter(); - UpgradeResponse response = new UpgradeResponseAdapter(); - CompletableFuture futureSession = new CompletableFuture<>(); - - JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(endpoint, request, response, futureSession); + JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(endpoint, request); frameHandler.onOpen(new FrameHandler.CoreSession.Empty(), Callback.NOOP); // Execute onClose call diff --git a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/SessionAddMessageHandlerTest.java b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/SessionAddMessageHandlerTest.java index ebee871ac30..0556c50be7f 100644 --- a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/SessionAddMessageHandlerTest.java +++ b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/SessionAddMessageHandlerTest.java @@ -21,12 +21,10 @@ package org.eclipse.jetty.websocket.javax.tests.client; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.List; -import java.util.concurrent.CompletableFuture; import javax.websocket.ClientEndpoint; import javax.websocket.ClientEndpointConfig; import javax.websocket.MessageHandler; -import javax.websocket.Session; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.Callback; @@ -42,8 +40,6 @@ import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerFactor import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketSession; import org.eclipse.jetty.websocket.javax.common.UpgradeRequest; import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter; -import org.eclipse.jetty.websocket.javax.common.UpgradeResponse; -import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter; import org.eclipse.jetty.websocket.javax.tests.MessageType; import org.eclipse.jetty.websocket.javax.tests.SessionMatchers; import org.eclipse.jetty.websocket.javax.tests.handlers.ByteArrayWholeHandler; @@ -80,11 +76,9 @@ public class SessionAddMessageHandlerTest ConfiguredEndpoint ei = new ConfiguredEndpoint(new DummyEndpoint(), endpointConfig); UpgradeRequest handshakeRequest = new UpgradeRequestAdapter(); - UpgradeResponse handshakeResponse = new UpgradeResponseAdapter(); JavaxWebSocketFrameHandlerFactory frameHandlerFactory = new JavaxWebSocketClientFrameHandlerFactory(container); - CompletableFuture futureSession = new CompletableFuture<>(); - frameHandler = frameHandlerFactory.newJavaxWebSocketFrameHandler(ei, handshakeRequest, handshakeResponse, futureSession); + frameHandler = frameHandlerFactory.newJavaxWebSocketFrameHandler(ei, handshakeRequest); frameHandler.onOpen(new FrameHandler.CoreSession.Empty(), Callback.NOOP); // Session diff --git a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/misbehaving/MisbehavingClassTest.java b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/misbehaving/MisbehavingClassTest.java index acbc3d27145..73fe4427531 100644 --- a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/misbehaving/MisbehavingClassTest.java +++ b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/client/misbehaving/MisbehavingClassTest.java @@ -34,7 +34,6 @@ import org.junit.jupiter.api.Test; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; -import static org.junit.jupiter.api.Assertions.assertThrows; public class MisbehavingClassTest { @@ -62,11 +61,9 @@ public class MisbehavingClassTest try (StacklessLogging ignored = new StacklessLogging(WebSocketCoreSession.class)) { - // Expecting CloseException during onOpen(). - assertThrows(CloseException.class, () -> container.connectToServer(socket, server.getWsUri()), "Should have failed .connectToServer()"); - + // expecting RuntimeException during onOpen + container.connectToServer(socket, server.getWsUri()); assertThat("Close should have occurred", socket.closeLatch.await(1, TimeUnit.SECONDS), is(true)); - Throwable cause = socket.errors.pop(); assertThat("Error", cause, instanceOf(RuntimeException.class)); } @@ -81,11 +78,9 @@ public class MisbehavingClassTest try (StacklessLogging ignored = new StacklessLogging(WebSocketCoreSession.class)) { - // Expecting CloseException during onOpen(). - assertThrows(CloseException.class, () -> container.connectToServer(socket, server.getWsUri()), "Should have failed .connectToServer()"); - + // expecting RuntimeException during onOpen + container.connectToServer(socket, server.getWsUri()); assertThat("Close should have occurred", socket.closeLatch.await(5, TimeUnit.SECONDS), is(true)); - Throwable cause = socket.errors.pop(); assertThat("Error", cause, instanceOf(RuntimeException.class)); } diff --git a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/server/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/server/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java index cbc1a86ec0f..878aa6ce9f9 100644 --- a/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/server/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java +++ b/jetty-websocket/javax-websocket-tests/src/test/java/org/eclipse/jetty/websocket/javax/tests/server/JavaxWebSocketFrameHandler_OnMessage_TextStreamTest.java @@ -21,11 +21,10 @@ package org.eclipse.jetty.websocket.javax.tests.server; import java.io.IOException; import java.io.Reader; import java.net.URI; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; + import javax.websocket.OnMessage; -import javax.websocket.Session; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; @@ -37,8 +36,6 @@ import org.eclipse.jetty.websocket.core.OpCode; import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler; import org.eclipse.jetty.websocket.javax.common.UpgradeRequest; import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter; -import org.eclipse.jetty.websocket.javax.common.UpgradeResponse; -import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter; import org.eclipse.jetty.websocket.javax.tests.WSEventTracker; import org.junit.jupiter.api.Test; @@ -51,11 +48,9 @@ public class JavaxWebSocketFrameHandler_OnMessage_TextStreamTest extends Abstrac private T performOnMessageInvocation(T socket, Consumer func) throws Exception { UpgradeRequest request = new UpgradeRequestAdapter(URI.create("http://localhost:8080/msg/foo")); - UpgradeResponse response = new UpgradeResponseAdapter(); - CompletableFuture futureSession = new CompletableFuture<>(); // Establish endpoint function - JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(socket, request, response, futureSession); + JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(socket, request); frameHandler.onOpen(new FrameHandler.CoreSession.Empty(), Callback.NOOP); func.accept(frameHandler); return socket; diff --git a/jetty-websocket/jetty-websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java b/jetty-websocket/jetty-websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java index 1e6d719559f..6528a9b5d1e 100644 --- a/jetty-websocket/jetty-websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java +++ b/jetty-websocket/jetty-websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java @@ -43,7 +43,6 @@ import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.UpgradeRequest; -import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.api.WebSocketBehavior; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.client.impl.JettyClientUpgradeRequest; @@ -147,8 +146,21 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli }); } upgradeRequest.setConfiguration(configurationCustomizer); - coreClient.connect(upgradeRequest); - return upgradeRequest.getFutureSession(); + CompletableFuture futureSession = new CompletableFuture<>(); + + coreClient.connect(upgradeRequest).whenComplete((coreSession, error)-> + { + if (error != null) + { + futureSession.completeExceptionally(error); + return; + } + + JettyWebSocketFrameHandler frameHandler = (JettyWebSocketFrameHandler)upgradeRequest.getFrameHandler(); + futureSession.complete(frameHandler.getSession()); + }); + + return futureSession; } @Override @@ -312,10 +324,9 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli return sessionTracker.getSessions(); } - public JettyWebSocketFrameHandler newFrameHandler(Object websocketPojo, UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse, - CompletableFuture futureSession) + public JettyWebSocketFrameHandler newFrameHandler(Object websocketPojo) { - return frameHandlerFactory.newJettyFrameHandler(websocketPojo, upgradeRequest, upgradeResponse, futureSession); + return frameHandlerFactory.newJettyFrameHandler(websocketPojo); } /** diff --git a/jetty-websocket/jetty-websocket-client/src/main/java/org/eclipse/jetty/websocket/client/impl/JettyClientUpgradeRequest.java b/jetty-websocket/jetty-websocket-client/src/main/java/org/eclipse/jetty/websocket/client/impl/JettyClientUpgradeRequest.java index df1eeaa2fa6..cef7520d78d 100644 --- a/jetty-websocket/jetty-websocket-client/src/main/java/org/eclipse/jetty/websocket/client/impl/JettyClientUpgradeRequest.java +++ b/jetty-websocket/jetty-websocket-client/src/main/java/org/eclipse/jetty/websocket/client/impl/JettyClientUpgradeRequest.java @@ -21,17 +21,15 @@ package org.eclipse.jetty.websocket.client.impl; import java.net.HttpCookie; import java.net.URI; import java.util.List; -import java.util.concurrent.CompletableFuture; import java.util.stream.Collectors; import org.eclipse.jetty.client.HttpResponse; +import org.eclipse.jetty.client.http.HttpConnectionOverHTTP; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpHeader; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.UpgradeRequest; -import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.client.WebSocketClient; import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandler; import org.eclipse.jetty.websocket.core.ExtensionConfig; @@ -42,17 +40,14 @@ import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient; public class JettyClientUpgradeRequest extends ClientUpgradeRequest { private final WebSocketClient containerContext; - private final Object websocketPojo; - private final CompletableFuture futureSession; private final DelegatedJettyClientUpgradeRequest handshakeRequest; + private final JettyWebSocketFrameHandler frameHandler; public JettyClientUpgradeRequest(WebSocketClient clientContainer, WebSocketCoreClient coreClient, UpgradeRequest request, URI requestURI, Object websocketPojo) { super(coreClient, requestURI); this.containerContext = clientContainer; - this.websocketPojo = websocketPojo; - this.futureSession = new CompletableFuture<>(); if (request != null) { @@ -90,6 +85,7 @@ public class JettyClientUpgradeRequest extends ClientUpgradeRequest } handshakeRequest = new DelegatedJettyClientUpgradeRequest(this); + frameHandler = containerContext.newFrameHandler(websocketPojo); } @Override @@ -99,25 +95,17 @@ public class JettyClientUpgradeRequest extends ClientUpgradeRequest handshakeRequest.configure(endp); } - protected void handleException(Throwable failure) + @Override + public void upgrade(HttpResponse response, HttpConnectionOverHTTP httpConnection) { - super.handleException(failure); - futureSession.completeExceptionally(failure); + frameHandler.setUpgradeRequest(new DelegatedJettyClientUpgradeRequest(this)); + frameHandler.setUpgradeResponse(new DelegatedJettyClientUpgradeResponse(response)); + super.upgrade(response, httpConnection); } @Override - public FrameHandler getFrameHandler(WebSocketCoreClient coreClient, HttpResponse response) + public FrameHandler getFrameHandler() { - UpgradeResponse upgradeResponse = new DelegatedJettyClientUpgradeResponse(response); - - JettyWebSocketFrameHandler frameHandler = containerContext.newFrameHandler(websocketPojo, - handshakeRequest, upgradeResponse, futureSession); - return frameHandler; } - - public CompletableFuture getFutureSession() - { - return futureSession; - } } diff --git a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java index 471daba8efc..f5a91bfc2f0 100644 --- a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java +++ b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandler.java @@ -20,7 +20,6 @@ package org.eclipse.jetty.websocket.common; import java.lang.invoke.MethodHandle; import java.nio.ByteBuffer; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; import org.eclipse.jetty.util.BufferUtil; @@ -28,7 +27,6 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.websocket.api.BatchMode; -import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.UpgradeRequest; import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.common.invoke.InvalidSignatureException; @@ -67,15 +65,9 @@ public class JettyWebSocketFrameHandler implements FrameHandler private MethodHandle frameHandle; private MethodHandle pingHandle; private MethodHandle pongHandle; - /** - * Immutable HandshakeRequest available via Session - */ - private final UpgradeRequest upgradeRequest; - /** - * Immutable HandshakeResponse available via Session - */ - private final UpgradeResponse upgradeResponse; - private final CompletableFuture futureSession; + private UpgradeRequest upgradeRequest; + private UpgradeResponse upgradeResponse; + private final Customizer customizer; private MessageSink textSink; private MessageSink binarySink; @@ -86,14 +78,12 @@ public class JettyWebSocketFrameHandler implements FrameHandler public JettyWebSocketFrameHandler(WebSocketContainer container, Object endpointInstance, - UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse, MethodHandle openHandle, MethodHandle closeHandle, MethodHandle errorHandle, MethodHandle textHandle, MethodHandle binaryHandle, Class textSinkClass, Class binarySinkClass, MethodHandle frameHandle, MethodHandle pingHandle, MethodHandle pongHandle, - CompletableFuture futureSession, BatchMode batchMode, Customizer customizer) { @@ -101,8 +91,6 @@ public class JettyWebSocketFrameHandler implements FrameHandler this.container = container; this.endpointInstance = endpointInstance; - this.upgradeRequest = upgradeRequest; - this.upgradeResponse = upgradeResponse; this.openHandle = openHandle; this.closeHandle = closeHandle; @@ -115,11 +103,35 @@ public class JettyWebSocketFrameHandler implements FrameHandler this.pingHandle = pingHandle; this.pongHandle = pongHandle; - this.futureSession = futureSession; this.batchMode = batchMode; this.customizer = customizer; } + public void setUpgradeRequest(UpgradeRequest upgradeRequest) + { + this.upgradeRequest = upgradeRequest; + } + + public void setUpgradeResponse(UpgradeResponse upgradeResponse) + { + this.upgradeResponse = upgradeResponse; + } + + public UpgradeRequest getUpgradeRequest() + { + return upgradeRequest; + } + + public UpgradeResponse getUpgradeResponse() + { + return upgradeResponse; + } + + public BatchMode getBatchMode() + { + return batchMode; + } + public WebSocketSession getSession() { return session; @@ -131,7 +143,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler try { customizer.customize(coreSession); - session = new WebSocketSession(coreSession, this, batchMode, upgradeRequest, upgradeResponse); + session = new WebSocketSession(coreSession, this); frameHandle = JettyWebSocketFrameHandlerFactory.bindTo(frameHandle, session); openHandle = JettyWebSocketFrameHandlerFactory.bindTo(openHandle, session); @@ -156,13 +168,11 @@ public class JettyWebSocketFrameHandler implements FrameHandler container.notifySessionListeners((listener) -> listener.onWebSocketSessionOpened(session)); callback.succeeded(); - futureSession.complete(session); demand(); } catch (Throwable cause) { callback.failed(new WebSocketException(endpointInstance.getClass().getSimpleName() + " OPEN method error: " + cause.getMessage(), cause)); - futureSession.completeExceptionally(cause); } } @@ -247,8 +257,6 @@ public class JettyWebSocketFrameHandler implements FrameHandler try { cause = convertCause(cause); - futureSession.completeExceptionally(cause); - if (errorHandle != null) errorHandle.invoke(cause); else diff --git a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerFactory.java b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerFactory.java index a7f2f863c1a..2522c0ccf10 100644 --- a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerFactory.java +++ b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerFactory.java @@ -31,7 +31,6 @@ import java.lang.reflect.Modifier; import java.nio.ByteBuffer; import java.time.Duration; import java.util.Map; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executor; @@ -40,8 +39,6 @@ import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.Frame; import org.eclipse.jetty.websocket.api.InvalidWebSocketException; import org.eclipse.jetty.websocket.api.Session; -import org.eclipse.jetty.websocket.api.UpgradeRequest; -import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.api.WebSocketConnectionListener; import org.eclipse.jetty.websocket.api.WebSocketFrameListener; import org.eclipse.jetty.websocket.api.WebSocketListener; @@ -119,8 +116,7 @@ public class JettyWebSocketFrameHandlerFactory extends ContainerLifeCycle throw new InvalidWebSocketException("Unrecognized WebSocket endpoint: " + endpointClass.getName()); } - public JettyWebSocketFrameHandler newJettyFrameHandler(Object endpointInstance, UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse, - CompletableFuture futureSession) + public JettyWebSocketFrameHandler newJettyFrameHandler(Object endpointInstance) { JettyWebSocketFrameHandlerMetadata metadata = getMetadata(endpointInstance.getClass()); @@ -145,19 +141,13 @@ public class JettyWebSocketFrameHandlerFactory extends ContainerLifeCycle pingHandle = bindTo(pingHandle, endpointInstance); pongHandle = bindTo(pongHandle, endpointInstance); - CompletableFuture future = futureSession; - if (future == null) - future = new CompletableFuture<>(); - JettyWebSocketFrameHandler frameHandler = new JettyWebSocketFrameHandler( container, endpointInstance, - upgradeRequest, upgradeResponse, openHandle, closeHandle, errorHandle, textHandle, binaryHandle, textSinkClass, binarySinkClass, frameHandle, pingHandle, pongHandle, - future, batchMode, metadata); diff --git a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java index 817f6a6c2f2..686149e065d 100644 --- a/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java +++ b/jetty-websocket/jetty-websocket-common/src/main/java/org/eclipse/jetty/websocket/common/WebSocketSession.java @@ -28,7 +28,6 @@ import org.eclipse.jetty.util.component.AbstractLifeCycle; import org.eclipse.jetty.util.component.Dumpable; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.websocket.api.BatchMode; import org.eclipse.jetty.websocket.api.CloseStatus; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.StatusCode; @@ -47,17 +46,13 @@ public class WebSocketSession extends AbstractLifeCycle implements Session, Susp private final UpgradeRequest upgradeRequest; private final UpgradeResponse upgradeResponse; - public WebSocketSession( - FrameHandler.CoreSession coreSession, - JettyWebSocketFrameHandler frameHandler, BatchMode batchMode, - UpgradeRequest upgradeRequest, - UpgradeResponse upgradeResponse) + public WebSocketSession(FrameHandler.CoreSession coreSession, JettyWebSocketFrameHandler frameHandler) { - this.coreSession = Objects.requireNonNull(coreSession); this.frameHandler = Objects.requireNonNull(frameHandler); - this.remoteEndpoint = new JettyWebSocketRemoteEndpoint(coreSession, batchMode); - this.upgradeRequest = upgradeRequest; - this.upgradeResponse = upgradeResponse; + this.coreSession = Objects.requireNonNull(coreSession); + this.upgradeRequest = frameHandler.getUpgradeRequest(); + this.upgradeResponse = frameHandler.getUpgradeResponse(); + this.remoteEndpoint = new JettyWebSocketRemoteEndpoint(coreSession, frameHandler.getBatchMode()); } @Override diff --git a/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerTest.java b/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerTest.java index d8938ea7143..6ac2ec2d348 100644 --- a/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerTest.java +++ b/jetty-websocket/jetty-websocket-common/src/test/java/org/eclipse/jetty/websocket/common/JettyWebSocketFrameHandlerTest.java @@ -29,8 +29,6 @@ import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.IO; import org.eclipse.jetty.websocket.api.Session; import org.eclipse.jetty.websocket.api.StatusCode; -import org.eclipse.jetty.websocket.api.UpgradeRequest; -import org.eclipse.jetty.websocket.api.UpgradeResponse; import org.eclipse.jetty.websocket.api.WebSocketConnectionListener; import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage; import org.eclipse.jetty.websocket.api.annotations.WebSocket; @@ -38,8 +36,6 @@ import org.eclipse.jetty.websocket.common.endpoints.listeners.ListenerBasicSocke import org.eclipse.jetty.websocket.common.endpoints.listeners.ListenerFrameSocket; import org.eclipse.jetty.websocket.common.endpoints.listeners.ListenerPartialSocket; import org.eclipse.jetty.websocket.common.endpoints.listeners.ListenerPingPongSocket; -import org.eclipse.jetty.websocket.common.handshake.DummyUpgradeRequest; -import org.eclipse.jetty.websocket.common.handshake.DummyUpgradeResponse; import org.eclipse.jetty.websocket.core.Behavior; import org.eclipse.jetty.websocket.core.CloseStatus; import org.eclipse.jetty.websocket.core.Frame; @@ -82,10 +78,7 @@ public class JettyWebSocketFrameHandlerTest private JettyWebSocketFrameHandler newLocalFrameHandler(Object wsEndpoint) { - UpgradeRequest upgradeRequest = new DummyUpgradeRequest(); - UpgradeResponse upgradeResponse = new DummyUpgradeResponse(); - JettyWebSocketFrameHandler localEndpoint = endpointFactory.newJettyFrameHandler(wsEndpoint, - upgradeRequest, upgradeResponse, null); + JettyWebSocketFrameHandler localEndpoint = endpointFactory.newJettyFrameHandler(wsEndpoint); return localEndpoint; } diff --git a/jetty-websocket/jetty-websocket-server/src/main/java/org/eclipse/jetty/websocket/server/JettyServerFrameHandlerFactory.java b/jetty-websocket/jetty-websocket-server/src/main/java/org/eclipse/jetty/websocket/server/JettyServerFrameHandlerFactory.java index a348cffd212..fcce48a3801 100644 --- a/jetty-websocket/jetty-websocket-server/src/main/java/org/eclipse/jetty/websocket/server/JettyServerFrameHandlerFactory.java +++ b/jetty-websocket/jetty-websocket-server/src/main/java/org/eclipse/jetty/websocket/server/JettyServerFrameHandlerFactory.java @@ -18,8 +18,6 @@ package org.eclipse.jetty.websocket.server; -import java.util.concurrent.CompletableFuture; - import javax.servlet.ServletContext; import org.eclipse.jetty.server.handler.ContextHandler; @@ -28,8 +26,6 @@ import org.eclipse.jetty.util.component.LifeCycle; import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandlerFactory; import org.eclipse.jetty.websocket.common.WebSocketContainer; import org.eclipse.jetty.websocket.core.FrameHandler; -import org.eclipse.jetty.websocket.server.internal.DelegatedJettyServletUpgradeRequest; -import org.eclipse.jetty.websocket.server.internal.UpgradeResponseAdapter; import org.eclipse.jetty.websocket.servlet.FrameHandlerFactory; import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest; import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse; @@ -52,8 +48,7 @@ public class JettyServerFrameHandlerFactory @Override public FrameHandler newFrameHandler(Object websocketPojo, ServletUpgradeRequest upgradeRequest, ServletUpgradeResponse upgradeResponse) { - return super.newJettyFrameHandler(websocketPojo, new DelegatedJettyServletUpgradeRequest(upgradeRequest), new UpgradeResponseAdapter(upgradeResponse), - new CompletableFuture<>()); + return super.newJettyFrameHandler(websocketPojo); } @Override diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/client/ClientUpgradeRequest.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/client/ClientUpgradeRequest.java index b2e29cc563e..cea7185782e 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/client/ClientUpgradeRequest.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/core/client/ClientUpgradeRequest.java @@ -49,6 +49,7 @@ import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.B64Code; +import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.QuotedStringTokenizer; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.log.Log; @@ -73,7 +74,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon return new ClientUpgradeRequest(webSocketClient, requestURI) { @Override - public FrameHandler getFrameHandler(WebSocketCoreClient coreClient, HttpResponse response) + public FrameHandler getFrameHandler() { return frameHandler; } @@ -83,6 +84,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon private static final Logger LOG = Log.getLogger(ClientUpgradeRequest.class); protected final CompletableFuture futureCoreSession; private final WebSocketCoreClient wsClient; + private FrameHandler frameHandler; private FrameHandler.ConfigurationCustomizer customizer = new FrameHandler.ConfigurationCustomizer(); private List upgradeListeners = new ArrayList<>(); @@ -187,6 +189,17 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon @Override public void send(final Response.CompleteListener listener) { + try + { + frameHandler = getFrameHandler(); + if (frameHandler == null) + throw new IllegalArgumentException("FrameHandler could not be created"); + } + catch (Throwable t) + { + throw new IllegalArgumentException("FrameHandler could not be created", t); + } + initWebSocketHeaders(); super.send(listener); } @@ -224,19 +237,13 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon } Throwable failure = result.getFailure(); - if ((failure instanceof java.net.SocketException) || - (failure instanceof java.io.InterruptedIOException) || - (failure instanceof HttpResponseException) || - (failure instanceof UpgradeException)) - { - // handle as-is - handleException(failure); - } - else - { - // wrap in UpgradeException - handleException(new UpgradeException(requestURI, responseStatusCode, responseLine, failure)); - } + boolean wrapFailure = !((failure instanceof java.net.SocketException) || + (failure instanceof java.io.InterruptedIOException) || + (failure instanceof HttpResponseException) || + (failure instanceof UpgradeException)); + if (wrapFailure) + failure = new UpgradeException(requestURI, responseStatusCode, responseLine, failure); + handleException(failure); } if (responseStatusCode != HttpStatus.SWITCHING_PROTOCOLS_101) @@ -250,6 +257,17 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon protected void handleException(Throwable failure) { futureCoreSession.completeExceptionally(failure); + if (frameHandler != null) + { + try + { + frameHandler.onError(failure, Callback.NOOP); + } + catch (Throwable t) + { + LOG.warn("FrameHandler onError threw", t); + } + } } @SuppressWarnings("Duplicates") @@ -332,19 +350,6 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon EndPoint endp = httpConnection.getEndPoint(); customize(endp); - FrameHandler frameHandler = getFrameHandler(wsClient, response); - - if (frameHandler == null) - { - StringBuilder err = new StringBuilder(); - err.append("FrameHandler is null for request ").append(this.getURI().toASCIIString()); - if (negotiatedSubProtocol != null) - { - err.append(" [subprotocol: ").append(negotiatedSubProtocol).append("]"); - } - throw new WebSocketException(err.toString()); - } - Request request = response.getRequest(); Negotiated negotiated = new Negotiated( request.getURI(), @@ -396,7 +401,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon return new WebSocketCoreSession(handler, Behavior.CLIENT, negotiated); } - public abstract FrameHandler getFrameHandler(WebSocketCoreClient coreClient, HttpResponse response); + public abstract FrameHandler getFrameHandler(); private final String genRandomKey() {