Issue #3705 - WebSocket upgrade failure and CompletableFuture refactor

* Issue #3705 - notify WebSocket framehandler on client upgrade failure

getFrameHandler on the ClientUpgradeRequest no longer takes
the upgrade response, the response must be set later if it is
required by the framehandler implementation

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>

* Issue #3705 - changes from review

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>

* Issue #3705 - throw if FrameHandler could not be created

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>

* wip

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>

* Issue #3705 - count down the onOpen latch in NetworkFuzzer

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>

* Issue #3705 - WebSocket Session CompletableFuture refactor

- sessionFutures for jetty and javax are now implemented using the
futureCoreSession which will occur after onOpen

- the request and response are set on the FrameHandler before the
upgrade

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan 2019-06-06 08:13:33 +10:00 committed by GitHub
parent 76051192a2
commit 62eedebd17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
23 changed files with 208 additions and 281 deletions

View File

@ -19,52 +19,41 @@
package org.eclipse.jetty.websocket.javax.client;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import javax.websocket.Session;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequest;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponse;
public class JavaxClientUpgradeRequest extends ClientUpgradeRequest
{
private final JavaxWebSocketClientContainer containerContext;
private final Object websocketPojo;
private final CompletableFuture<Session> futureSession;
private final JavaxWebSocketFrameHandler frameHandler;
public JavaxClientUpgradeRequest(JavaxWebSocketClientContainer clientContainer, WebSocketCoreClient coreClient, URI requestURI, Object websocketPojo)
{
super(coreClient, requestURI);
this.containerContext = clientContainer;
this.websocketPojo = websocketPojo;
this.futureSession = new CompletableFuture<>();
}
@Override
protected void handleException(Throwable failure)
{
super.handleException(failure);
futureSession.completeExceptionally(failure);
}
@Override
public FrameHandler getFrameHandler(WebSocketCoreClient coreClient, HttpResponse response)
{
UpgradeRequest upgradeRequest = new DelegatedJavaxClientUpgradeRequest(this);
UpgradeResponse upgradeResponse = new DelegatedJavaxClientUpgradeResponse(response);
frameHandler = containerContext.newFrameHandler(websocketPojo, upgradeRequest);
}
JavaxWebSocketFrameHandler frameHandler = containerContext.newFrameHandler(websocketPojo, upgradeRequest, upgradeResponse, futureSession);
@Override
public void upgrade(HttpResponse response, HttpConnectionOverHTTP httpConnection)
{
frameHandler.setUpgradeRequest(new DelegatedJavaxClientUpgradeRequest(this));
frameHandler.setUpgradeResponse(new DelegatedJavaxClientUpgradeResponse(response));
super.upgrade(response, httpConnection);
}
@Override
public FrameHandler getFrameHandler()
{
return frameHandler;
}
public CompletableFuture<Session> getFutureSession()
{
return futureSession;
}
}

View File

@ -45,6 +45,7 @@ import org.eclipse.jetty.websocket.javax.common.ConfiguredEndpoint;
import org.eclipse.jetty.websocket.javax.common.InvalidWebSocketException;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketContainer;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketExtensionConfig;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerFactory;
/**
@ -106,17 +107,29 @@ public class JavaxWebSocketClientContainer extends JavaxWebSocketContainer imple
private CompletableFuture<Session> connect(JavaxClientUpgradeRequest upgradeRequest)
{
upgradeRequest.setConfiguration(defaultCustomizer);
CompletableFuture<Session> fut = upgradeRequest.getFutureSession();
CompletableFuture<Session> futureSession = new CompletableFuture<>();
try
{
getWebSocketCoreClient().connect(upgradeRequest);
return fut;
WebSocketCoreClient coreClient = getWebSocketCoreClient();
coreClient.connect(upgradeRequest).whenComplete((coreSession, error)->
{
if (error != null)
{
futureSession.completeExceptionally(error);
return;
}
JavaxWebSocketFrameHandler frameHandler = (JavaxWebSocketFrameHandler)upgradeRequest.getFrameHandler();
futureSession.complete(frameHandler.getSession());
});
}
catch (Exception e)
{
fut.completeExceptionally(e);
return fut;
futureSession.completeExceptionally(e);
}
return futureSession;
}
private Session connect(ConfiguredEndpoint configuredEndpoint, URI destURI) throws IOException
@ -141,7 +154,7 @@ public class JavaxWebSocketClientContainer extends JavaxWebSocketContainer imple
upgradeRequest.setSubProtocols(clientEndpointConfig.getPreferredSubprotocols());
}
long timeout = coreClient.getHttpClient().getConnectTimeout();
long timeout = getWebSocketCoreClient().getHttpClient().getConnectTimeout();
try
{
Future<Session> sessionFuture = connect(upgradeRequest);

View File

@ -23,12 +23,10 @@ import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.function.Consumer;
import javax.websocket.Extension;
import javax.websocket.Session;
import javax.websocket.WebSocketContainer;
import org.eclipse.jetty.io.ByteBufferPool;
@ -156,10 +154,9 @@ public abstract class JavaxWebSocketContainer extends ContainerLifeCycle impleme
return sessionTracker.getSessions();
}
public JavaxWebSocketFrameHandler newFrameHandler(Object websocketPojo, UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse,
CompletableFuture<Session> futureSession)
public JavaxWebSocketFrameHandler newFrameHandler(Object websocketPojo, UpgradeRequest upgradeRequest)
{
return getFrameHandlerFactory().newJavaxWebSocketFrameHandler(websocketPojo, upgradeRequest, upgradeResponse, futureSession);
return getFrameHandlerFactory().newJavaxWebSocketFrameHandler(websocketPojo, upgradeRequest);
}
/**

View File

@ -27,7 +27,6 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import javax.websocket.CloseReason;
@ -35,7 +34,6 @@ import javax.websocket.Decoder;
import javax.websocket.EndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.PongMessage;
import javax.websocket.Session;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -96,17 +94,11 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
private JavaxWebSocketFrameHandlerMetadata.MessageMetadata binaryMetadata;
// TODO: need pingHandle ?
private MethodHandle pongHandle;
/**
* Immutable HandshakeRequest available via Session
*/
private final UpgradeRequest upgradeRequest;
/**
* Immutable javax.websocket.HandshakeResponse available via Session
*/
private final UpgradeResponse upgradeResponse;
private final String id;
private UpgradeRequest upgradeRequest;
private UpgradeResponse upgradeResponse;
private final EndpointConfig endpointConfig;
private final CompletableFuture<Session> futureSession;
private MessageSink textSink;
private MessageSink binarySink;
private MessageSink activeMessageSink;
@ -118,14 +110,11 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
public JavaxWebSocketFrameHandler(JavaxWebSocketContainer container,
Object endpointInstance,
UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse,
MethodHandle openHandle, MethodHandle closeHandle, MethodHandle errorHandle,
JavaxWebSocketFrameHandlerMetadata.MessageMetadata textMetadata,
JavaxWebSocketFrameHandlerMetadata.MessageMetadata binaryMetadata,
MethodHandle pongHandle,
String id,
EndpointConfig endpointConfig,
CompletableFuture<Session> futureSession)
EndpointConfig endpointConfig)
{
this.LOG = Log.getLogger(endpointInstance.getClass());
@ -137,8 +126,6 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
throw oops;
}
this.endpointInstance = endpointInstance;
this.upgradeRequest = upgradeRequest;
this.upgradeResponse = upgradeResponse;
this.openHandle = openHandle;
this.closeHandle = closeHandle;
@ -147,9 +134,7 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
this.binaryMetadata = binaryMetadata;
this.pongHandle = pongHandle;
this.id = id;
this.endpointConfig = endpointConfig;
this.futureSession = futureSession;
this.messageHandlerMap = new HashMap<>();
}
@ -174,7 +159,7 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
try
{
this.coreSession = coreSession;
session = new JavaxWebSocketSession(container, coreSession, this, upgradeRequest.getUserPrincipal(), id, endpointConfig);
session = new JavaxWebSocketSession(container, coreSession, this, endpointConfig);
openHandle = InvokerUtils.bindTo(openHandle, session, endpointConfig);
closeHandle = InvokerUtils.bindTo(closeHandle, session);
@ -214,13 +199,11 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
container.notifySessionListeners((listener) -> listener.onJavaxWebSocketSessionOpened(session));
callback.succeeded();
futureSession.complete(session);
}
catch (Throwable cause)
{
Exception wse = new WebSocketException(endpointInstance.getClass().getSimpleName() + " OPEN method error: " + cause.getMessage(), cause);
callback.failed(wse);
futureSession.completeExceptionally(wse);
}
}
@ -281,8 +264,6 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
{
try
{
futureSession.completeExceptionally(cause);
if (errorHandle != null)
errorHandle.invoke(cause);
else
@ -294,7 +275,6 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
WebSocketException wsError = new WebSocketException(endpointInstance.getClass().getSimpleName() + " ERROR method error: " + cause.getMessage(), t);
wsError.addSuppressed(cause);
callback.failed(wsError);
// TODO should futureSession be failed here?
}
}
@ -630,4 +610,24 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
throw new ProtocolException("Unable to process continuation during dataType " + dataType);
}
}
public void setUpgradeRequest(UpgradeRequest upgradeRequest)
{
this.upgradeRequest = upgradeRequest;
}
public void setUpgradeResponse(UpgradeResponse upgradeResponse)
{
this.upgradeResponse = upgradeResponse;
}
public UpgradeRequest getUpgradeRequest()
{
return upgradeRequest;
}
public UpgradeResponse getUpgradeResponse()
{
return upgradeResponse;
}
}

View File

@ -31,7 +31,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import javax.websocket.CloseReason;
@ -108,9 +107,7 @@ public abstract class JavaxWebSocketFrameHandlerFactory
public abstract JavaxWebSocketFrameHandlerMetadata createMetadata(Class<?> endpointClass, EndpointConfig endpointConfig);
public JavaxWebSocketFrameHandler newJavaxWebSocketFrameHandler(Object endpointInstance, UpgradeRequest upgradeRequest,
UpgradeResponse upgradeResponse,
CompletableFuture<Session> futureSession)
public JavaxWebSocketFrameHandler newJavaxWebSocketFrameHandler(Object endpointInstance, UpgradeRequest upgradeRequest)
{
Object endpoint;
EndpointConfig config;
@ -162,22 +159,13 @@ public abstract class JavaxWebSocketFrameHandlerFactory
errorHandle = InvokerUtils.bindTo(errorHandle, endpoint);
pongHandle = InvokerUtils.bindTo(pongHandle, endpoint);
CompletableFuture<Session> future = futureSession;
if (future == null)
future = new CompletableFuture<>();
String id = upgradeRequest.toString();
JavaxWebSocketFrameHandler frameHandler = new JavaxWebSocketFrameHandler(
container,
endpoint,
upgradeRequest, upgradeResponse,
openHandle, closeHandle, errorHandle,
textMetadata, binaryMetadata,
pongHandle,
id,
config,
future);
config);
return frameHandler;
}

View File

@ -61,10 +61,8 @@ public class JavaxWebSocketSession extends AbstractLifeCycle implements javax.we
protected final SharedBlockingCallback blocking = new SharedBlockingCallback();
private final JavaxWebSocketContainer container;
private final FrameHandler.CoreSession coreSession;
private final Principal principal;
private final JavaxWebSocketFrameHandler frameHandler;
private final EndpointConfig config;
private final String id;
private final AvailableDecoders availableDecoders;
private final AvailableEncoders availableEncoders;
private final Map<String, String> pathParameters;
@ -77,15 +75,11 @@ public class JavaxWebSocketSession extends AbstractLifeCycle implements javax.we
public JavaxWebSocketSession(JavaxWebSocketContainer container,
FrameHandler.CoreSession coreSession,
JavaxWebSocketFrameHandler frameHandler,
Principal upgradeRequestPrincipal,
String id,
EndpointConfig endpointConfig)
{
this.container = container;
this.coreSession = coreSession;
this.frameHandler = frameHandler;
this.principal = upgradeRequestPrincipal;
this.id = id;
this.config = endpointConfig == null?new BasicEndpointConfig():endpointConfig;
@ -139,7 +133,6 @@ public class JavaxWebSocketSession extends AbstractLifeCycle implements javax.we
}
frameHandler.addMessageHandler(this, clazz, handler);
}
/**
@ -308,7 +301,7 @@ public class JavaxWebSocketSession extends AbstractLifeCycle implements javax.we
@Override
public String getId()
{
return this.id;
return this.frameHandler.getUpgradeRequest().toString();
}
/**
@ -516,7 +509,7 @@ public class JavaxWebSocketSession extends AbstractLifeCycle implements javax.we
@Override
public Principal getUserPrincipal()
{
return this.principal;
return this.frameHandler.getUpgradeRequest().getUserPrincipal();
}
/**

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.websocket.javax.common;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import javax.websocket.EndpointConfig;
@ -67,10 +66,8 @@ public abstract class AbstractJavaxWebSocketFrameHandlerTest
BasicEndpointConfig config = new BasicEndpointConfig();
ConfiguredEndpoint endpoint = new ConfiguredEndpoint(websocket, config);
UpgradeRequest upgradeRequest = new UpgradeRequestAdapter();
UpgradeResponse upgradeResponse = new UpgradeResponseAdapter();
JavaxWebSocketFrameHandler localEndpoint = factory.newJavaxWebSocketFrameHandler(endpoint,
upgradeRequest, upgradeResponse, new CompletableFuture<>());
JavaxWebSocketFrameHandler localEndpoint = factory.newJavaxWebSocketFrameHandler(endpoint, upgradeRequest);
return localEndpoint;
}

View File

@ -38,18 +38,9 @@ public abstract class AbstractSessionTest
container.start();
Object websocketPojo = new DummyEndpoint();
UpgradeRequest upgradeRequest = new UpgradeRequestAdapter();
UpgradeResponse upgradeResponse = new UpgradeResponseAdapter();
JavaxWebSocketFrameHandler frameHandler =
container.newFrameHandler(websocketPojo, upgradeRequest, upgradeResponse, null);
JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(websocketPojo, upgradeRequest);
FrameHandler.CoreSession coreSession = new FrameHandler.CoreSession.Empty();
String id = "dummy";
EndpointConfig endpointConfig = null;
session = new JavaxWebSocketSession(container,
coreSession,
frameHandler,
null,
id,
endpointConfig);
session = new JavaxWebSocketSession(container, coreSession, frameHandler, null);
container.addManaged(session);
}

View File

@ -18,11 +18,8 @@
package org.eclipse.jetty.websocket.javax.server;
import java.util.concurrent.CompletableFuture;
import javax.websocket.Endpoint;
import javax.websocket.EndpointConfig;
import javax.websocket.Session;
import javax.websocket.server.ServerEndpoint;
import org.eclipse.jetty.http.pathmap.UriTemplatePathSpec;
@ -32,7 +29,6 @@ import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerFactor
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerMetadata;
import org.eclipse.jetty.websocket.javax.server.internal.DelegatedJavaxServletUpgradeRequest;
import org.eclipse.jetty.websocket.javax.server.internal.PathParamIdentifier;
import org.eclipse.jetty.websocket.javax.server.internal.UpgradeResponseAdapter;
import org.eclipse.jetty.websocket.servlet.FrameHandlerFactory;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
@ -68,7 +64,6 @@ public class JavaxWebSocketServerFrameHandlerFactory extends JavaxWebSocketFrame
@Override
public FrameHandler newFrameHandler(Object websocketPojo, ServletUpgradeRequest upgradeRequest, ServletUpgradeResponse upgradeResponse)
{
CompletableFuture<Session> completableFuture = new CompletableFuture<>();
return newJavaxWebSocketFrameHandler(websocketPojo, new DelegatedJavaxServletUpgradeRequest(upgradeRequest), new UpgradeResponseAdapter(upgradeResponse), completableFuture);
return newJavaxWebSocketFrameHandler(websocketPojo, new DelegatedJavaxServletUpgradeRequest(upgradeRequest));
}
}

View File

@ -25,10 +25,10 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
@ -42,6 +42,8 @@ import org.eclipse.jetty.websocket.core.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
import org.eclipse.jetty.websocket.core.internal.Generator;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseable
{
private final LocalServer server;
@ -66,8 +68,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
super();
this.server = server;
this.client = new WebSocketCoreClient();
CompletableFuture<FrameCapture> futureOnCapture = new CompletableFuture<>();
this.upgradeRequest = new RawUpgradeRequest(client, wsURI, futureOnCapture);
this.upgradeRequest = new RawUpgradeRequest(client, wsURI);
if (requestHeaders != null)
{
HttpFields fields = this.upgradeRequest.getHeaders();
@ -81,7 +82,7 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
this.generator = new UnitGenerator(Behavior.CLIENT);
CompletableFuture<FrameHandler.CoreSession> futureHandler = this.client.connect(upgradeRequest);
CompletableFuture<FrameCapture> futureCapture = futureHandler.thenCombine(futureOnCapture, (session, capture) -> capture);
CompletableFuture<FrameCapture> futureCapture = futureHandler.thenCombine(upgradeRequest.getFuture(), (session, capture) -> capture);
this.frameCapture = futureCapture.get(10, TimeUnit.SECONDS);
}
@ -186,27 +187,31 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
public static class RawUpgradeRequest extends ClientUpgradeRequest
{
private final FrameCapture frameCapture = new FrameCapture();
private final CompletableFuture<FrameCapture> futureCapture;
private EndPoint endPoint;
public RawUpgradeRequest(WebSocketCoreClient webSocketClient, URI requestURI, CompletableFuture<FrameCapture> futureCapture)
public RawUpgradeRequest(WebSocketCoreClient webSocketClient, URI requestURI)
{
super(webSocketClient, requestURI);
this.futureCapture = futureCapture;
this.futureCapture = new CompletableFuture<>();
}
public CompletableFuture<FrameCapture> getFuture()
{
return futureCapture;
}
@Override
public FrameHandler getFrameHandler(WebSocketCoreClient coreClient, HttpResponse response)
public FrameHandler getFrameHandler()
{
FrameCapture frameCapture = new FrameCapture(this.endPoint);
futureCapture.complete(frameCapture);
return frameCapture;
}
@Override
protected void customize(EndPoint endp)
{
this.endPoint = endp;
frameCapture.setEndPoint(endp);
futureCapture.complete(frameCapture);
}
@Override
@ -220,20 +225,21 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
public static class FrameCapture implements FrameHandler
{
private final BlockingQueue<Frame> receivedFrames = new LinkedBlockingQueue<>();
private final EndPoint endPoint;
private EndPoint endPoint;
private CountDownLatch openLatch = new CountDownLatch(1);
private final SharedBlockingCallback blockingCallback = new SharedBlockingCallback();
private CoreSession coreSession;
public FrameCapture(EndPoint endPoint)
public void setEndPoint(EndPoint endpoint)
{
this.endPoint = endPoint;
this.endPoint = endpoint;
}
@Override
public void onOpen(CoreSession coreSession, Callback callback)
{
this.coreSession = coreSession;
this.openLatch.countDown();
callback.succeeded();
}
@ -256,14 +262,22 @@ public class NetworkFuzzer extends Fuzzer.Adapter implements Fuzzer, AutoCloseab
callback.succeeded();
}
public void writeRaw(ByteBuffer buffer) throws IOException
{
try
{
assertTrue(openLatch.await(1, TimeUnit.SECONDS));
}
catch (InterruptedException e)
{
throw new IOException(e);
}
synchronized (this)
{
try (SharedBlockingCallback.Blocker blocker = blockingCallback.acquire())
{
this.endPoint.write(blocker, buffer);
endPoint.write(blocker, buffer);
}
}
}

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.javax.tests.client;
import javax.websocket.EndpointConfig;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketContainer;
@ -27,8 +25,6 @@ import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketSession;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequest;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponse;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter;
import org.eclipse.jetty.websocket.javax.tests.DummyEndpoint;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@ -45,18 +41,9 @@ public abstract class AbstractClientSessionTest
container.start();
Object websocketPojo = new DummyEndpoint();
UpgradeRequest upgradeRequest = new UpgradeRequestAdapter();
UpgradeResponse upgradeResponse = new UpgradeResponseAdapter();
JavaxWebSocketFrameHandler frameHandler =
container.newFrameHandler(websocketPojo, upgradeRequest, upgradeResponse, null);
JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(websocketPojo, upgradeRequest);
FrameHandler.CoreSession coreSession = new FrameHandler.CoreSession.Empty();
String id = "dummy";
EndpointConfig endpointConfig = null;
session = new JavaxWebSocketSession(container,
coreSession,
frameHandler,
null,
id,
endpointConfig);
session = new JavaxWebSocketSession(container, coreSession, frameHandler, null);
container.addManaged(session);
}

View File

@ -20,11 +20,10 @@ package org.eclipse.jetty.websocket.javax.tests.client;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.stream.Stream;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.Session;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.core.CloseStatus;
@ -34,8 +33,6 @@ import org.eclipse.jetty.websocket.javax.client.JavaxWebSocketClientContainer;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequest;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponse;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter;
import org.eclipse.jetty.websocket.javax.tests.WSEventTracker;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseReasonSessionSocket;
import org.eclipse.jetty.websocket.javax.tests.client.samples.CloseReasonSocket;
@ -107,10 +104,7 @@ public class OnCloseTest
container.start();
UpgradeRequest request = new UpgradeRequestAdapter();
UpgradeResponse response = new UpgradeResponseAdapter();
CompletableFuture<Session> futureSession = new CompletableFuture<>();
JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(endpoint, request, response, futureSession);
JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(endpoint, request);
frameHandler.onOpen(new FrameHandler.CoreSession.Empty(), Callback.NOOP);
// Execute onClose call

View File

@ -21,12 +21,10 @@ package org.eclipse.jetty.websocket.javax.tests.client;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import javax.websocket.ClientEndpoint;
import javax.websocket.ClientEndpointConfig;
import javax.websocket.MessageHandler;
import javax.websocket.Session;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
@ -42,8 +40,6 @@ import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandlerFactor
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketSession;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequest;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponse;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter;
import org.eclipse.jetty.websocket.javax.tests.MessageType;
import org.eclipse.jetty.websocket.javax.tests.SessionMatchers;
import org.eclipse.jetty.websocket.javax.tests.handlers.ByteArrayWholeHandler;
@ -80,11 +76,9 @@ public class SessionAddMessageHandlerTest
ConfiguredEndpoint ei = new ConfiguredEndpoint(new DummyEndpoint(), endpointConfig);
UpgradeRequest handshakeRequest = new UpgradeRequestAdapter();
UpgradeResponse handshakeResponse = new UpgradeResponseAdapter();
JavaxWebSocketFrameHandlerFactory frameHandlerFactory = new JavaxWebSocketClientFrameHandlerFactory(container);
CompletableFuture<Session> futureSession = new CompletableFuture<>();
frameHandler = frameHandlerFactory.newJavaxWebSocketFrameHandler(ei, handshakeRequest, handshakeResponse, futureSession);
frameHandler = frameHandlerFactory.newJavaxWebSocketFrameHandler(ei, handshakeRequest);
frameHandler.onOpen(new FrameHandler.CoreSession.Empty(), Callback.NOOP);
// Session

View File

@ -34,7 +34,6 @@ import org.junit.jupiter.api.Test;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.instanceOf;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertThrows;
public class MisbehavingClassTest
{
@ -62,11 +61,9 @@ public class MisbehavingClassTest
try (StacklessLogging ignored = new StacklessLogging(WebSocketCoreSession.class))
{
// Expecting CloseException during onOpen().
assertThrows(CloseException.class, () -> container.connectToServer(socket, server.getWsUri()), "Should have failed .connectToServer()");
// expecting RuntimeException during onOpen
container.connectToServer(socket, server.getWsUri());
assertThat("Close should have occurred", socket.closeLatch.await(1, TimeUnit.SECONDS), is(true));
Throwable cause = socket.errors.pop();
assertThat("Error", cause, instanceOf(RuntimeException.class));
}
@ -81,11 +78,9 @@ public class MisbehavingClassTest
try (StacklessLogging ignored = new StacklessLogging(WebSocketCoreSession.class))
{
// Expecting CloseException during onOpen().
assertThrows(CloseException.class, () -> container.connectToServer(socket, server.getWsUri()), "Should have failed .connectToServer()");
// expecting RuntimeException during onOpen
container.connectToServer(socket, server.getWsUri());
assertThat("Close should have occurred", socket.closeLatch.await(5, TimeUnit.SECONDS), is(true));
Throwable cause = socket.errors.pop();
assertThat("Error", cause, instanceOf(RuntimeException.class));
}

View File

@ -21,11 +21,10 @@ package org.eclipse.jetty.websocket.javax.tests.server;
import java.io.IOException;
import java.io.Reader;
import java.net.URI;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import javax.websocket.OnMessage;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
@ -37,8 +36,6 @@ import org.eclipse.jetty.websocket.core.OpCode;
import org.eclipse.jetty.websocket.javax.common.JavaxWebSocketFrameHandler;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequest;
import org.eclipse.jetty.websocket.javax.common.UpgradeRequestAdapter;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponse;
import org.eclipse.jetty.websocket.javax.common.UpgradeResponseAdapter;
import org.eclipse.jetty.websocket.javax.tests.WSEventTracker;
import org.junit.jupiter.api.Test;
@ -51,11 +48,9 @@ public class JavaxWebSocketFrameHandler_OnMessage_TextStreamTest extends Abstrac
private <T extends WSEventTracker> T performOnMessageInvocation(T socket, Consumer<JavaxWebSocketFrameHandler> func) throws Exception
{
UpgradeRequest request = new UpgradeRequestAdapter(URI.create("http://localhost:8080/msg/foo"));
UpgradeResponse response = new UpgradeResponseAdapter();
CompletableFuture<Session> futureSession = new CompletableFuture<>();
// Establish endpoint function
JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(socket, request, response, futureSession);
JavaxWebSocketFrameHandler frameHandler = container.newFrameHandler(socket, request);
frameHandler.onOpen(new FrameHandler.CoreSession.Empty(), Callback.NOOP);
func.accept(frameHandler);
return socket;

View File

@ -43,7 +43,6 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketBehavior;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.impl.JettyClientUpgradeRequest;
@ -147,8 +146,21 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
});
}
upgradeRequest.setConfiguration(configurationCustomizer);
coreClient.connect(upgradeRequest);
return upgradeRequest.getFutureSession();
CompletableFuture<Session> futureSession = new CompletableFuture<>();
coreClient.connect(upgradeRequest).whenComplete((coreSession, error)->
{
if (error != null)
{
futureSession.completeExceptionally(error);
return;
}
JettyWebSocketFrameHandler frameHandler = (JettyWebSocketFrameHandler)upgradeRequest.getFrameHandler();
futureSession.complete(frameHandler.getSession());
});
return futureSession;
}
@Override
@ -312,10 +324,9 @@ public class WebSocketClient extends ContainerLifeCycle implements WebSocketPoli
return sessionTracker.getSessions();
}
public JettyWebSocketFrameHandler newFrameHandler(Object websocketPojo, UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse,
CompletableFuture<Session> futureSession)
public JettyWebSocketFrameHandler newFrameHandler(Object websocketPojo)
{
return frameHandlerFactory.newJettyFrameHandler(websocketPojo, upgradeRequest, upgradeResponse, futureSession);
return frameHandlerFactory.newJettyFrameHandler(websocketPojo);
}
/**

View File

@ -21,17 +21,15 @@ package org.eclipse.jetty.websocket.client.impl;
import java.net.HttpCookie;
import java.net.URI;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
import org.eclipse.jetty.client.HttpResponse;
import org.eclipse.jetty.client.http.HttpConnectionOverHTTP;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpHeader;
import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandler;
import org.eclipse.jetty.websocket.core.ExtensionConfig;
@ -42,17 +40,14 @@ import org.eclipse.jetty.websocket.core.client.WebSocketCoreClient;
public class JettyClientUpgradeRequest extends ClientUpgradeRequest
{
private final WebSocketClient containerContext;
private final Object websocketPojo;
private final CompletableFuture<Session> futureSession;
private final DelegatedJettyClientUpgradeRequest handshakeRequest;
private final JettyWebSocketFrameHandler frameHandler;
public JettyClientUpgradeRequest(WebSocketClient clientContainer, WebSocketCoreClient coreClient, UpgradeRequest request,
URI requestURI, Object websocketPojo)
{
super(coreClient, requestURI);
this.containerContext = clientContainer;
this.websocketPojo = websocketPojo;
this.futureSession = new CompletableFuture<>();
if (request != null)
{
@ -90,6 +85,7 @@ public class JettyClientUpgradeRequest extends ClientUpgradeRequest
}
handshakeRequest = new DelegatedJettyClientUpgradeRequest(this);
frameHandler = containerContext.newFrameHandler(websocketPojo);
}
@Override
@ -99,25 +95,17 @@ public class JettyClientUpgradeRequest extends ClientUpgradeRequest
handshakeRequest.configure(endp);
}
protected void handleException(Throwable failure)
@Override
public void upgrade(HttpResponse response, HttpConnectionOverHTTP httpConnection)
{
super.handleException(failure);
futureSession.completeExceptionally(failure);
frameHandler.setUpgradeRequest(new DelegatedJettyClientUpgradeRequest(this));
frameHandler.setUpgradeResponse(new DelegatedJettyClientUpgradeResponse(response));
super.upgrade(response, httpConnection);
}
@Override
public FrameHandler getFrameHandler(WebSocketCoreClient coreClient, HttpResponse response)
public FrameHandler getFrameHandler()
{
UpgradeResponse upgradeResponse = new DelegatedJettyClientUpgradeResponse(response);
JettyWebSocketFrameHandler frameHandler = containerContext.newFrameHandler(websocketPojo,
handshakeRequest, upgradeResponse, futureSession);
return frameHandler;
}
public CompletableFuture<Session> getFutureSession()
{
return futureSession;
}
}

View File

@ -20,7 +20,6 @@ package org.eclipse.jetty.websocket.common;
import java.lang.invoke.MethodHandle;
import java.nio.ByteBuffer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import org.eclipse.jetty.util.BufferUtil;
@ -28,7 +27,6 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.common.invoke.InvalidSignatureException;
@ -67,15 +65,9 @@ public class JettyWebSocketFrameHandler implements FrameHandler
private MethodHandle frameHandle;
private MethodHandle pingHandle;
private MethodHandle pongHandle;
/**
* Immutable HandshakeRequest available via Session
*/
private final UpgradeRequest upgradeRequest;
/**
* Immutable HandshakeResponse available via Session
*/
private final UpgradeResponse upgradeResponse;
private final CompletableFuture<Session> futureSession;
private UpgradeRequest upgradeRequest;
private UpgradeResponse upgradeResponse;
private final Customizer customizer;
private MessageSink textSink;
private MessageSink binarySink;
@ -86,14 +78,12 @@ public class JettyWebSocketFrameHandler implements FrameHandler
public JettyWebSocketFrameHandler(WebSocketContainer container,
Object endpointInstance,
UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse,
MethodHandle openHandle, MethodHandle closeHandle, MethodHandle errorHandle,
MethodHandle textHandle, MethodHandle binaryHandle,
Class<? extends MessageSink> textSinkClass,
Class<? extends MessageSink> binarySinkClass,
MethodHandle frameHandle,
MethodHandle pingHandle, MethodHandle pongHandle,
CompletableFuture<Session> futureSession,
BatchMode batchMode,
Customizer customizer)
{
@ -101,8 +91,6 @@ public class JettyWebSocketFrameHandler implements FrameHandler
this.container = container;
this.endpointInstance = endpointInstance;
this.upgradeRequest = upgradeRequest;
this.upgradeResponse = upgradeResponse;
this.openHandle = openHandle;
this.closeHandle = closeHandle;
@ -115,11 +103,35 @@ public class JettyWebSocketFrameHandler implements FrameHandler
this.pingHandle = pingHandle;
this.pongHandle = pongHandle;
this.futureSession = futureSession;
this.batchMode = batchMode;
this.customizer = customizer;
}
public void setUpgradeRequest(UpgradeRequest upgradeRequest)
{
this.upgradeRequest = upgradeRequest;
}
public void setUpgradeResponse(UpgradeResponse upgradeResponse)
{
this.upgradeResponse = upgradeResponse;
}
public UpgradeRequest getUpgradeRequest()
{
return upgradeRequest;
}
public UpgradeResponse getUpgradeResponse()
{
return upgradeResponse;
}
public BatchMode getBatchMode()
{
return batchMode;
}
public WebSocketSession getSession()
{
return session;
@ -131,7 +143,7 @@ public class JettyWebSocketFrameHandler implements FrameHandler
try
{
customizer.customize(coreSession);
session = new WebSocketSession(coreSession, this, batchMode, upgradeRequest, upgradeResponse);
session = new WebSocketSession(coreSession, this);
frameHandle = JettyWebSocketFrameHandlerFactory.bindTo(frameHandle, session);
openHandle = JettyWebSocketFrameHandlerFactory.bindTo(openHandle, session);
@ -156,13 +168,11 @@ public class JettyWebSocketFrameHandler implements FrameHandler
container.notifySessionListeners((listener) -> listener.onWebSocketSessionOpened(session));
callback.succeeded();
futureSession.complete(session);
demand();
}
catch (Throwable cause)
{
callback.failed(new WebSocketException(endpointInstance.getClass().getSimpleName() + " OPEN method error: " + cause.getMessage(), cause));
futureSession.completeExceptionally(cause);
}
}
@ -247,8 +257,6 @@ public class JettyWebSocketFrameHandler implements FrameHandler
try
{
cause = convertCause(cause);
futureSession.completeExceptionally(cause);
if (errorHandle != null)
errorHandle.invoke(cause);
else

View File

@ -31,7 +31,6 @@ import java.lang.reflect.Modifier;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Executor;
@ -40,8 +39,6 @@ import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.Frame;
import org.eclipse.jetty.websocket.api.InvalidWebSocketException;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketConnectionListener;
import org.eclipse.jetty.websocket.api.WebSocketFrameListener;
import org.eclipse.jetty.websocket.api.WebSocketListener;
@ -119,8 +116,7 @@ public class JettyWebSocketFrameHandlerFactory extends ContainerLifeCycle
throw new InvalidWebSocketException("Unrecognized WebSocket endpoint: " + endpointClass.getName());
}
public JettyWebSocketFrameHandler newJettyFrameHandler(Object endpointInstance, UpgradeRequest upgradeRequest, UpgradeResponse upgradeResponse,
CompletableFuture<Session> futureSession)
public JettyWebSocketFrameHandler newJettyFrameHandler(Object endpointInstance)
{
JettyWebSocketFrameHandlerMetadata metadata = getMetadata(endpointInstance.getClass());
@ -145,19 +141,13 @@ public class JettyWebSocketFrameHandlerFactory extends ContainerLifeCycle
pingHandle = bindTo(pingHandle, endpointInstance);
pongHandle = bindTo(pongHandle, endpointInstance);
CompletableFuture<Session> future = futureSession;
if (future == null)
future = new CompletableFuture<>();
JettyWebSocketFrameHandler frameHandler = new JettyWebSocketFrameHandler(
container,
endpointInstance,
upgradeRequest, upgradeResponse,
openHandle, closeHandle, errorHandle,
textHandle, binaryHandle,
textSinkClass, binarySinkClass,
frameHandle, pingHandle, pongHandle,
future,
batchMode,
metadata);

View File

@ -28,7 +28,6 @@ import org.eclipse.jetty.util.component.AbstractLifeCycle;
import org.eclipse.jetty.util.component.Dumpable;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.BatchMode;
import org.eclipse.jetty.websocket.api.CloseStatus;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
@ -47,17 +46,13 @@ public class WebSocketSession extends AbstractLifeCycle implements Session, Susp
private final UpgradeRequest upgradeRequest;
private final UpgradeResponse upgradeResponse;
public WebSocketSession(
FrameHandler.CoreSession coreSession,
JettyWebSocketFrameHandler frameHandler, BatchMode batchMode,
UpgradeRequest upgradeRequest,
UpgradeResponse upgradeResponse)
public WebSocketSession(FrameHandler.CoreSession coreSession, JettyWebSocketFrameHandler frameHandler)
{
this.coreSession = Objects.requireNonNull(coreSession);
this.frameHandler = Objects.requireNonNull(frameHandler);
this.remoteEndpoint = new JettyWebSocketRemoteEndpoint(coreSession, batchMode);
this.upgradeRequest = upgradeRequest;
this.upgradeResponse = upgradeResponse;
this.coreSession = Objects.requireNonNull(coreSession);
this.upgradeRequest = frameHandler.getUpgradeRequest();
this.upgradeResponse = frameHandler.getUpgradeResponse();
this.remoteEndpoint = new JettyWebSocketRemoteEndpoint(coreSession, frameHandler.getBatchMode());
}
@Override

View File

@ -29,8 +29,6 @@ import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketConnectionListener;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
@ -38,8 +36,6 @@ import org.eclipse.jetty.websocket.common.endpoints.listeners.ListenerBasicSocke
import org.eclipse.jetty.websocket.common.endpoints.listeners.ListenerFrameSocket;
import org.eclipse.jetty.websocket.common.endpoints.listeners.ListenerPartialSocket;
import org.eclipse.jetty.websocket.common.endpoints.listeners.ListenerPingPongSocket;
import org.eclipse.jetty.websocket.common.handshake.DummyUpgradeRequest;
import org.eclipse.jetty.websocket.common.handshake.DummyUpgradeResponse;
import org.eclipse.jetty.websocket.core.Behavior;
import org.eclipse.jetty.websocket.core.CloseStatus;
import org.eclipse.jetty.websocket.core.Frame;
@ -82,10 +78,7 @@ public class JettyWebSocketFrameHandlerTest
private JettyWebSocketFrameHandler newLocalFrameHandler(Object wsEndpoint)
{
UpgradeRequest upgradeRequest = new DummyUpgradeRequest();
UpgradeResponse upgradeResponse = new DummyUpgradeResponse();
JettyWebSocketFrameHandler localEndpoint = endpointFactory.newJettyFrameHandler(wsEndpoint,
upgradeRequest, upgradeResponse, null);
JettyWebSocketFrameHandler localEndpoint = endpointFactory.newJettyFrameHandler(wsEndpoint);
return localEndpoint;
}

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.websocket.server;
import java.util.concurrent.CompletableFuture;
import javax.servlet.ServletContext;
import org.eclipse.jetty.server.handler.ContextHandler;
@ -28,8 +26,6 @@ import org.eclipse.jetty.util.component.LifeCycle;
import org.eclipse.jetty.websocket.common.JettyWebSocketFrameHandlerFactory;
import org.eclipse.jetty.websocket.common.WebSocketContainer;
import org.eclipse.jetty.websocket.core.FrameHandler;
import org.eclipse.jetty.websocket.server.internal.DelegatedJettyServletUpgradeRequest;
import org.eclipse.jetty.websocket.server.internal.UpgradeResponseAdapter;
import org.eclipse.jetty.websocket.servlet.FrameHandlerFactory;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeRequest;
import org.eclipse.jetty.websocket.servlet.ServletUpgradeResponse;
@ -52,8 +48,7 @@ public class JettyServerFrameHandlerFactory
@Override
public FrameHandler newFrameHandler(Object websocketPojo, ServletUpgradeRequest upgradeRequest, ServletUpgradeResponse upgradeResponse)
{
return super.newJettyFrameHandler(websocketPojo, new DelegatedJettyServletUpgradeRequest(upgradeRequest), new UpgradeResponseAdapter(upgradeResponse),
new CompletableFuture<>());
return super.newJettyFrameHandler(websocketPojo);
}
@Override

View File

@ -49,6 +49,7 @@ import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
@ -73,7 +74,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
return new ClientUpgradeRequest(webSocketClient, requestURI)
{
@Override
public FrameHandler getFrameHandler(WebSocketCoreClient coreClient, HttpResponse response)
public FrameHandler getFrameHandler()
{
return frameHandler;
}
@ -83,6 +84,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
private static final Logger LOG = Log.getLogger(ClientUpgradeRequest.class);
protected final CompletableFuture<FrameHandler.CoreSession> futureCoreSession;
private final WebSocketCoreClient wsClient;
private FrameHandler frameHandler;
private FrameHandler.ConfigurationCustomizer customizer = new FrameHandler.ConfigurationCustomizer();
private List<UpgradeListener> upgradeListeners = new ArrayList<>();
@ -187,6 +189,17 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
@Override
public void send(final Response.CompleteListener listener)
{
try
{
frameHandler = getFrameHandler();
if (frameHandler == null)
throw new IllegalArgumentException("FrameHandler could not be created");
}
catch (Throwable t)
{
throw new IllegalArgumentException("FrameHandler could not be created", t);
}
initWebSocketHeaders();
super.send(listener);
}
@ -224,19 +237,13 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
}
Throwable failure = result.getFailure();
if ((failure instanceof java.net.SocketException) ||
(failure instanceof java.io.InterruptedIOException) ||
(failure instanceof HttpResponseException) ||
(failure instanceof UpgradeException))
{
// handle as-is
handleException(failure);
}
else
{
// wrap in UpgradeException
handleException(new UpgradeException(requestURI, responseStatusCode, responseLine, failure));
}
boolean wrapFailure = !((failure instanceof java.net.SocketException) ||
(failure instanceof java.io.InterruptedIOException) ||
(failure instanceof HttpResponseException) ||
(failure instanceof UpgradeException));
if (wrapFailure)
failure = new UpgradeException(requestURI, responseStatusCode, responseLine, failure);
handleException(failure);
}
if (responseStatusCode != HttpStatus.SWITCHING_PROTOCOLS_101)
@ -250,6 +257,17 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
protected void handleException(Throwable failure)
{
futureCoreSession.completeExceptionally(failure);
if (frameHandler != null)
{
try
{
frameHandler.onError(failure, Callback.NOOP);
}
catch (Throwable t)
{
LOG.warn("FrameHandler onError threw", t);
}
}
}
@SuppressWarnings("Duplicates")
@ -332,19 +350,6 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
EndPoint endp = httpConnection.getEndPoint();
customize(endp);
FrameHandler frameHandler = getFrameHandler(wsClient, response);
if (frameHandler == null)
{
StringBuilder err = new StringBuilder();
err.append("FrameHandler is null for request ").append(this.getURI().toASCIIString());
if (negotiatedSubProtocol != null)
{
err.append(" [subprotocol: ").append(negotiatedSubProtocol).append("]");
}
throw new WebSocketException(err.toString());
}
Request request = response.getRequest();
Negotiated negotiated = new Negotiated(
request.getURI(),
@ -396,7 +401,7 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
return new WebSocketCoreSession(handler, Behavior.CLIENT, negotiated);
}
public abstract FrameHandler getFrameHandler(WebSocketCoreClient coreClient, HttpResponse response);
public abstract FrameHandler getFrameHandler();
private final String genRandomKey()
{