Issue #3298 - cleanup of CompletableFutures for WebSocket upgrades

Jetty and Javax ClientUpgradeRequests no longer use the combination of
the the onOpenFuture and the futureCoreSession and instead use only
the CompletableFuture future given to the FrameHandler onOpen

Signed-off-by: Lachlan Roberts <lachlan@webtide.com>
This commit is contained in:
Lachlan Roberts 2019-01-31 11:13:05 +11:00
parent 684dcd1693
commit bd4f9b30fc
5 changed files with 17 additions and 20 deletions

View File

@ -35,21 +35,21 @@ public class JavaxClientUpgradeRequest extends ClientUpgradeRequest
{ {
private final JavaxWebSocketClientContainer containerContext; private final JavaxWebSocketClientContainer containerContext;
private final Object websocketPojo; private final Object websocketPojo;
private final CompletableFuture<Session> futureJavaxSession; private final CompletableFuture<Session> futureSession;
public JavaxClientUpgradeRequest(JavaxWebSocketClientContainer clientContainer, WebSocketCoreClient coreClient, URI requestURI, Object websocketPojo) public JavaxClientUpgradeRequest(JavaxWebSocketClientContainer clientContainer, WebSocketCoreClient coreClient, URI requestURI, Object websocketPojo)
{ {
super(coreClient, requestURI); super(coreClient, requestURI);
this.containerContext = clientContainer; this.containerContext = clientContainer;
this.websocketPojo = websocketPojo; this.websocketPojo = websocketPojo;
this.futureJavaxSession = new CompletableFuture<>(); this.futureSession = new CompletableFuture<>();
} }
@Override @Override
protected void handleException(Throwable failure) protected void handleException(Throwable failure)
{ {
super.handleException(failure); super.handleException(failure);
futureJavaxSession.completeExceptionally(failure); futureSession.completeExceptionally(failure);
} }
@Override @Override
@ -58,13 +58,13 @@ public class JavaxClientUpgradeRequest extends ClientUpgradeRequest
UpgradeRequest upgradeRequest = new DelegatedJavaxClientUpgradeRequest(this); UpgradeRequest upgradeRequest = new DelegatedJavaxClientUpgradeRequest(this);
UpgradeResponse upgradeResponse = new DelegatedJavaxClientUpgradeResponse(response); UpgradeResponse upgradeResponse = new DelegatedJavaxClientUpgradeResponse(response);
JavaxWebSocketFrameHandler frameHandler = containerContext.newFrameHandler(websocketPojo, upgradeRequest, upgradeResponse, futureJavaxSession); JavaxWebSocketFrameHandler frameHandler = containerContext.newFrameHandler(websocketPojo, upgradeRequest, upgradeResponse, futureSession);
return frameHandler; return frameHandler;
} }
public CompletableFuture<Session> getFutureSession() public CompletableFuture<Session> getFutureSession()
{ {
return futureJavaxSession; return futureSession;
} }
} }

View File

@ -29,6 +29,7 @@ import java.util.Optional;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import javax.websocket.CloseReason; import javax.websocket.CloseReason;
import javax.websocket.Decoder; import javax.websocket.Decoder;
import javax.websocket.EndpointConfig; import javax.websocket.EndpointConfig;
@ -229,16 +230,14 @@ public class JavaxWebSocketFrameHandler implements FrameHandler
openHandle.invoke(); openHandle.invoke();
container.addBean(session, true); container.addBean(session, true);
futureSession.complete(session);
callback.succeeded(); callback.succeeded();
futureSession.complete(session);
} }
catch (Throwable cause) catch (Throwable cause)
{ {
Exception wse = new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause); Exception wse = new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause);
// TODO This feels like double handling of the exception? Review need for futureSession
futureSession.completeExceptionally(wse);
callback.failed(wse); callback.failed(wse);
futureSession.completeExceptionally(wse);
} }
} }

View File

@ -41,7 +41,6 @@ public class JettyClientUpgradeRequest extends ClientUpgradeRequest
{ {
private final WebSocketClient containerContext; private final WebSocketClient containerContext;
private final Object websocketPojo; private final Object websocketPojo;
private final CompletableFuture<Session> onOpenFuture;
private final CompletableFuture<Session> futureSession; private final CompletableFuture<Session> futureSession;
private final DelegatedJettyClientUpgradeRequest handshakeRequest; private final DelegatedJettyClientUpgradeRequest handshakeRequest;
@ -51,9 +50,7 @@ public class JettyClientUpgradeRequest extends ClientUpgradeRequest
super(coreClient, requestURI); super(coreClient, requestURI);
this.containerContext = clientContainer; this.containerContext = clientContainer;
this.websocketPojo = websocketPojo; this.websocketPojo = websocketPojo;
this.futureSession = new CompletableFuture<>();
this.onOpenFuture = new CompletableFuture<>();
this.futureSession = super.futureCoreSession.thenCombine(onOpenFuture, (channel, session) -> session);
if (request != null) if (request != null)
{ {
@ -103,7 +100,7 @@ public class JettyClientUpgradeRequest extends ClientUpgradeRequest
protected void handleException(Throwable failure) protected void handleException(Throwable failure)
{ {
super.handleException(failure); super.handleException(failure);
onOpenFuture.completeExceptionally(failure); futureSession.completeExceptionally(failure);
} }
@Override @Override
@ -112,7 +109,7 @@ public class JettyClientUpgradeRequest extends ClientUpgradeRequest
UpgradeResponse upgradeResponse = new DelegatedJettyClientUpgradeResponse(response); UpgradeResponse upgradeResponse = new DelegatedJettyClientUpgradeResponse(response);
JettyWebSocketFrameHandler frameHandler = containerContext.newFrameHandler(websocketPojo, JettyWebSocketFrameHandler frameHandler = containerContext.newFrameHandler(websocketPojo,
handshakeRequest, upgradeResponse, onOpenFuture); handshakeRequest, upgradeResponse, futureSession);
return frameHandler; return frameHandler;
} }

View File

@ -141,13 +141,13 @@ public class JettyWebSocketFrameHandler implements FrameHandler
if (openHandle != null) if (openHandle != null)
openHandle.invoke(); openHandle.invoke();
futureSession.complete(session);
callback.succeeded(); callback.succeeded();
futureSession.complete(session);
} }
catch (Throwable cause) catch (Throwable cause)
{ {
// TODO should futureSession be failed here?
callback.failed(new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause)); callback.failed(new WebSocketException(endpointInstance.getClass().getName() + " OPEN method error: " + cause.getMessage(), cause));
futureSession.completeExceptionally(cause);
} }
} }

View File

@ -347,11 +347,12 @@ public abstract class ClientUpgradeRequest extends HttpRequest implements Respon
try try
{ {
endp.upgrade(wsConnection); endp.upgrade(wsConnection);
}
finally
{
futureCoreSession.complete(wsChannel); futureCoreSession.complete(wsChannel);
} }
catch (Throwable t)
{
futureCoreSession.completeExceptionally(t);
}
} }
/** /**