Reworking client connection handling per discussion with simone

This commit is contained in:
Joakim Erdfelt 2012-08-14 09:44:21 -07:00
parent 447b9ecee5
commit 9c63ed975e
4 changed files with 48 additions and 52 deletions

View File

@ -21,6 +21,7 @@ import java.net.SocketAddress;
import java.net.URI;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.concurrent.Future;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
@ -112,7 +113,7 @@ public class WebSocketClient
this.policy = WebSocketPolicy.newClientPolicy();
}
public FutureCallback<WebSocketConnection> connect(URI websocketUri, Object websocketPojo) throws IOException
public Future<WebSocketConnection> connect(URI websocketUri, Object websocketPojo) throws IOException
{
if (!factory.isStarted())
{

View File

@ -6,17 +6,29 @@ import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.WebSocketClient.ConnectFuture;
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
import org.eclipse.jetty.websocket.io.AbstractWebSocketConnection;
public class WebSocketClientConnection extends AbstractWebSocketConnection
{
private final WebSocketClientFactory factory;
private final ConnectFuture connectFuture;
public WebSocketClientConnection(EndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy,
ByteBufferPool bufferPool, WebSocketClientFactory factory)
public WebSocketClientConnection(EndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool,
WebSocketClientFactory factory, ConnectFuture confut)
{
super(endp,executor,scheduler,policy,bufferPool);
this.factory = factory;
this.connectFuture = confut;
}
@Override
public void onOpen()
{
super.onOpen();
// TODO: Handshake handshake = new WebSocket13Handshake(this);
// TODO: getExecutor().execute(handshake);
}
}

View File

@ -20,10 +20,8 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLException;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
@ -31,15 +29,19 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.WebSocketClient.ConnectFuture;
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
import org.eclipse.jetty.websocket.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.io.AbstractWebSocketConnection;
public class WebSocketClientSelectorManager extends SelectorManager
{
private static final Logger LOG = Log.getLogger(WebSocketClientSelectorManager.class);
private final Executor executor;
private final ScheduledExecutorService scheduler;
private final WebSocketPolicy policy;
@ -67,7 +69,7 @@ public class WebSocketClientSelectorManager extends SelectorManager
}
@Override
public Connection newConnection(final SocketChannel channel, EndPoint endPoint, final Object attachment)
public Connection newConnection(final SocketChannel channel, EndPoint endPoint, final Object attachment) throws IOException
{
LOG.debug("newConnection({},{},{})",channel,endPoint,attachment);
WebSocketClient.ConnectFuture confut = (WebSocketClient.ConnectFuture)attachment;
@ -76,49 +78,45 @@ public class WebSocketClientSelectorManager extends SelectorManager
{
String scheme = confut.getWebSocketUri().getScheme();
if ((sslContextFactory != null) && ("wss".equalsIgnoreCase(scheme)))
if ("wss".equalsIgnoreCase(scheme))
{
final AtomicReference<EndPoint> sslEndPointRef = new AtomicReference<>();
final AtomicReference<Object> attachmentRef = new AtomicReference<>(attachment);
SSLEngine engine = newSSLEngine(sslContextFactory,channel);
SslConnection sslConnection = new SslConnection(bufferPool,executor,endPoint,engine)
// Encrypted "wss://"
if (sslContextFactory != null)
{
@Override
public void onClose()
{
sslEndPointRef.set(null);
attachmentRef.set(null);
super.onClose();
}
};
endPoint.setConnection(sslConnection);
EndPoint sslEndPoint = sslConnection.getDecryptedEndPoint();
sslEndPointRef.set(sslEndPoint);
SSLEngine engine = newSSLEngine(sslContextFactory,channel);
SslConnection sslConnection = new SslConnection(bufferPool,executor,endPoint,engine);
EndPoint sslEndPoint = sslConnection.getDecryptedEndPoint();
startHandshake(engine);
Connection connection = newWebSocketConnection(channel,sslEndPoint,attachment);
endPoint.setConnection(connection);
return connection;
Connection connection = newWebSocketConnection(channel,sslEndPoint,confut);
sslEndPoint.setConnection(connection);
connectionOpened(connection);
return sslConnection;
}
else
{
// FIXME: throw error
throw new IOException("Cannot init SSL");
}
}
else
{
Connection connection = newWebSocketConnection(channel,endPoint,attachment);
endPoint.setConnection(connection);
return connection;
// Standard "ws://"
return newWebSocketConnection(channel,endPoint,confut);
}
}
catch (Throwable t)
catch (IOException e)
{
LOG.debug(t);
confut.failed(null,t);
throw t;
LOG.debug(e);
confut.failed(null,e);
// rethrow
throw e;
}
}
@Override
protected EndPoint newEndPoint(SocketChannel channel, ManagedSelector selectSet, SelectionKey selectionKey) throws IOException
{
LOG.debug("newEndPoint({}, {}, {})",channel,selectSet,selectionKey);
return new SelectChannelEndPoint(channel,selectSet,selectionKey,scheduler,policy.getIdleTimeout());
}
@ -131,9 +129,8 @@ public class WebSocketClientSelectorManager extends SelectorManager
return engine;
}
public AbstractWebSocketConnection newWebSocketConnection(SocketChannel channel, EndPoint endPoint, Object attachment)
public AbstractWebSocketConnection newWebSocketConnection(SocketChannel channel, EndPoint endPoint, ConnectFuture confut)
{
WebSocketClient.ConnectFuture confut = (WebSocketClient.ConnectFuture)attachment;
WebSocketClientFactory factory = confut.getFactory();
WebSocketEventDriver websocket = confut.getWebSocket();
@ -142,12 +139,10 @@ public class WebSocketClientSelectorManager extends SelectorManager
ByteBufferPool bufferPool = factory.getBufferPool();
ScheduledExecutorService scheduler = factory.getScheduler();
AbstractWebSocketConnection connection = new WebSocketClientConnection(endPoint,executor,scheduler,policy,bufferPool,factory);
endPoint.setConnection(connection);
AbstractWebSocketConnection connection = new WebSocketClientConnection(endPoint,executor,scheduler,policy,bufferPool,factory,confut);
connection.getParser().setIncomingFramesHandler(websocket);
// TODO: track open websockets? bind open websocket to connection?
return connection;
}
@ -155,16 +150,4 @@ public class WebSocketClientSelectorManager extends SelectorManager
{
this.sslContextFactory = sslContextFactory;
}
private void startHandshake(SSLEngine engine)
{
try
{
engine.beginHandshake();
}
catch (SSLException x)
{
throw new RuntimeException(x);
}
}
}

View File

@ -1,5 +1,5 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.LEVEL=DEBUG
# org.eclipse.jetty.io.ChannelEndPoint.LEVEL=INFO
# org.eclipse.jetty.websocket.LEVEL=WARN
org.eclipse.jetty.websocket.LEVEL=DEBUG