From 9c63ed975e777a2b323c789e6e6f5d184f37617b Mon Sep 17 00:00:00 2001 From: Joakim Erdfelt Date: Tue, 14 Aug 2012 09:44:21 -0700 Subject: [PATCH] Reworking client connection handling per discussion with simone --- .../websocket/client/WebSocketClient.java | 3 +- .../client/io/WebSocketClientConnection.java | 16 +++- .../io/WebSocketClientSelectorManager.java | 79 ++++++++----------- .../test/resources/jetty-logging.properties | 2 +- 4 files changed, 48 insertions(+), 52 deletions(-) diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java index a7c89eae6b1..22196be99f4 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/WebSocketClient.java @@ -21,6 +21,7 @@ import java.net.SocketAddress; import java.net.URI; import java.nio.channels.SocketChannel; import java.util.Map; +import java.util.concurrent.Future; import org.eclipse.jetty.util.FutureCallback; import org.eclipse.jetty.util.log.Log; @@ -112,7 +113,7 @@ public class WebSocketClient this.policy = WebSocketPolicy.newClientPolicy(); } - public FutureCallback connect(URI websocketUri, Object websocketPojo) throws IOException + public Future connect(URI websocketUri, Object websocketPojo) throws IOException { if (!factory.isStarted()) { diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java index c185840de9c..4d0e05582f1 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientConnection.java @@ -6,17 +6,29 @@ import java.util.concurrent.ScheduledExecutorService; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.websocket.api.WebSocketPolicy; +import org.eclipse.jetty.websocket.client.WebSocketClient.ConnectFuture; import org.eclipse.jetty.websocket.client.WebSocketClientFactory; import org.eclipse.jetty.websocket.io.AbstractWebSocketConnection; public class WebSocketClientConnection extends AbstractWebSocketConnection { private final WebSocketClientFactory factory; + private final ConnectFuture connectFuture; - public WebSocketClientConnection(EndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, - ByteBufferPool bufferPool, WebSocketClientFactory factory) + public WebSocketClientConnection(EndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool, + WebSocketClientFactory factory, ConnectFuture confut) { super(endp,executor,scheduler,policy,bufferPool); this.factory = factory; + this.connectFuture = confut; + } + + @Override + public void onOpen() + { + super.onOpen(); + + // TODO: Handshake handshake = new WebSocket13Handshake(this); + // TODO: getExecutor().execute(handshake); } } diff --git a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java index 0fc42d0ccc6..fc7c8431717 100644 --- a/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java +++ b/jetty-websocket/websocket-client/src/main/java/org/eclipse/jetty/websocket/client/io/WebSocketClientSelectorManager.java @@ -20,10 +20,8 @@ import java.nio.channels.SelectionKey; import java.nio.channels.SocketChannel; import java.util.concurrent.Executor; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.atomic.AtomicReference; import javax.net.ssl.SSLEngine; -import javax.net.ssl.SSLException; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.Connection; @@ -31,15 +29,19 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.SelectChannelEndPoint; import org.eclipse.jetty.io.SelectorManager; import org.eclipse.jetty.io.ssl.SslConnection; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.ssl.SslContextFactory; import org.eclipse.jetty.websocket.api.WebSocketPolicy; import org.eclipse.jetty.websocket.client.WebSocketClient; +import org.eclipse.jetty.websocket.client.WebSocketClient.ConnectFuture; import org.eclipse.jetty.websocket.client.WebSocketClientFactory; import org.eclipse.jetty.websocket.driver.WebSocketEventDriver; import org.eclipse.jetty.websocket.io.AbstractWebSocketConnection; public class WebSocketClientSelectorManager extends SelectorManager { + private static final Logger LOG = Log.getLogger(WebSocketClientSelectorManager.class); private final Executor executor; private final ScheduledExecutorService scheduler; private final WebSocketPolicy policy; @@ -67,7 +69,7 @@ public class WebSocketClientSelectorManager extends SelectorManager } @Override - public Connection newConnection(final SocketChannel channel, EndPoint endPoint, final Object attachment) + public Connection newConnection(final SocketChannel channel, EndPoint endPoint, final Object attachment) throws IOException { LOG.debug("newConnection({},{},{})",channel,endPoint,attachment); WebSocketClient.ConnectFuture confut = (WebSocketClient.ConnectFuture)attachment; @@ -76,49 +78,45 @@ public class WebSocketClientSelectorManager extends SelectorManager { String scheme = confut.getWebSocketUri().getScheme(); - if ((sslContextFactory != null) && ("wss".equalsIgnoreCase(scheme))) + if ("wss".equalsIgnoreCase(scheme)) { - final AtomicReference sslEndPointRef = new AtomicReference<>(); - final AtomicReference attachmentRef = new AtomicReference<>(attachment); - SSLEngine engine = newSSLEngine(sslContextFactory,channel); - SslConnection sslConnection = new SslConnection(bufferPool,executor,endPoint,engine) + // Encrypted "wss://" + if (sslContextFactory != null) { - @Override - public void onClose() - { - sslEndPointRef.set(null); - attachmentRef.set(null); - super.onClose(); - } - }; - endPoint.setConnection(sslConnection); - EndPoint sslEndPoint = sslConnection.getDecryptedEndPoint(); - sslEndPointRef.set(sslEndPoint); + SSLEngine engine = newSSLEngine(sslContextFactory,channel); + SslConnection sslConnection = new SslConnection(bufferPool,executor,endPoint,engine); + EndPoint sslEndPoint = sslConnection.getDecryptedEndPoint(); - startHandshake(engine); - - Connection connection = newWebSocketConnection(channel,sslEndPoint,attachment); - endPoint.setConnection(connection); - return connection; + Connection connection = newWebSocketConnection(channel,sslEndPoint,confut); + sslEndPoint.setConnection(connection); + connectionOpened(connection); + return sslConnection; + } + else + { + // FIXME: throw error + throw new IOException("Cannot init SSL"); + } } else { - Connection connection = newWebSocketConnection(channel,endPoint,attachment); - endPoint.setConnection(connection); - return connection; + // Standard "ws://" + return newWebSocketConnection(channel,endPoint,confut); } } - catch (Throwable t) + catch (IOException e) { - LOG.debug(t); - confut.failed(null,t); - throw t; + LOG.debug(e); + confut.failed(null,e); + // rethrow + throw e; } } @Override protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException { + LOG.debug("newEndPoint({}, {}, {})",channel,selectSet,selectionKey); return new SelectChannelEndPoint(channel,selectSet,selectionKey,scheduler,policy.getIdleTimeout()); } @@ -131,9 +129,8 @@ public class WebSocketClientSelectorManager extends SelectorManager return engine; } - public AbstractWebSocketConnection newWebSocketConnection(SocketChannel channel, EndPoint endPoint, Object attachment) + public AbstractWebSocketConnection newWebSocketConnection(SocketChannel channel, EndPoint endPoint, ConnectFuture confut) { - WebSocketClient.ConnectFuture confut = (WebSocketClient.ConnectFuture)attachment; WebSocketClientFactory factory = confut.getFactory(); WebSocketEventDriver websocket = confut.getWebSocket(); @@ -142,12 +139,10 @@ public class WebSocketClientSelectorManager extends SelectorManager ByteBufferPool bufferPool = factory.getBufferPool(); ScheduledExecutorService scheduler = factory.getScheduler(); - AbstractWebSocketConnection connection = new WebSocketClientConnection(endPoint,executor,scheduler,policy,bufferPool,factory); - endPoint.setConnection(connection); + AbstractWebSocketConnection connection = new WebSocketClientConnection(endPoint,executor,scheduler,policy,bufferPool,factory,confut); connection.getParser().setIncomingFramesHandler(websocket); // TODO: track open websockets? bind open websocket to connection? - return connection; } @@ -155,16 +150,4 @@ public class WebSocketClientSelectorManager extends SelectorManager { this.sslContextFactory = sslContextFactory; } - - private void startHandshake(SSLEngine engine) - { - try - { - engine.beginHandshake(); - } - catch (SSLException x) - { - throw new RuntimeException(x); - } - } } diff --git a/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties b/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties index ea04a9c9622..34eefeef2d3 100644 --- a/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties +++ b/jetty-websocket/websocket-client/src/test/resources/jetty-logging.properties @@ -1,5 +1,5 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog -org.eclipse.jetty.LEVEL=WARN +# org.eclipse.jetty.LEVEL=DEBUG # org.eclipse.jetty.io.ChannelEndPoint.LEVEL=INFO # org.eclipse.jetty.websocket.LEVEL=WARN org.eclipse.jetty.websocket.LEVEL=DEBUG