393733 - WebSocketClient interface should support multiple connections

* Initial pass at merging functions of WebSocketClientFactory into
  WebSocketClient itself, eliminating the need for the factory.
* Introducing ConnectPromise to separate the connect future from
  the client itself, hopefully allowing for multiple connections
  from the client instance.
This commit is contained in:
Joakim Erdfelt 2013-01-15 11:51:16 -07:00
parent eff1262e49
commit acaa2aa4de
23 changed files with 763 additions and 714 deletions

View File

@ -46,6 +46,8 @@ public interface Session
/**
* Return the list of extensions currently in use for this conversation.
* <p>
* Convenience method for <code>.getUpgradeResponse().getExtensions()</code>
*
* @return the negotiated extensions
*/
@ -53,6 +55,8 @@ public interface Session
/**
* Return the sub protocol agreed during the websocket handshake for this conversation.
* <p>
* Convenience method for <code>.getUpgradeResponse().getAcceptedSubProtocol()</code>
*
* @return the negotiated subprotocol
*/
@ -68,6 +72,8 @@ public interface Session
/**
* Return the query string associated with the request this session was opened under.
* <p>
* Convenience method for <code>.getUpgradeRequest().getRequestURI().getQuery()</code>
*/
String getQueryString();
@ -82,6 +88,8 @@ public interface Session
* Return the URI that this session was opened under.
* <p>
* Note, this is different than the servlet-api getRequestURI, as this will return the query portion as well.
* <p>
* Convenience method for <code>.getUpgradeRequest().getRequestURI()</code>
*
* @return the request URI.
*/
@ -95,6 +103,20 @@ public interface Session
*/
long getTimeout();
/**
* Get the UpgradeRequest used to create this session
*
* @return the UpgradeRequest used to create this session
*/
UpgradeRequest getUpgradeRequest();
/**
* Get the UpgradeResponse used to create this session
*
* @return the UpgradeResponse used to create this session
*/
UpgradeResponse getUpgradeResponse();
/**
* Return true if and only if the underlying socket is open.
*

View File

@ -60,6 +60,7 @@ public class ClientUpgradeRequest extends UpgradeRequest
private final String key;
private CookieStore cookieStore;
private int connectTimeout = 0;
public ClientUpgradeRequest()
{
@ -67,7 +68,7 @@ public class ClientUpgradeRequest extends UpgradeRequest
this.key = genRandomKey();
}
public ClientUpgradeRequest(URI requestURI)
protected ClientUpgradeRequest(URI requestURI)
{
super(requestURI);
this.key = genRandomKey();
@ -182,16 +183,6 @@ public class ClientUpgradeRequest extends UpgradeRequest
request.append("\r\n");
return request.toString();
}
@Override
public List<HttpCookie> getCookies()
{
if(cookieStore != null) {
return cookieStore.get(getRequestURI());
}
return super.getCookies();
}
private final String genRandomKey()
{
@ -200,6 +191,22 @@ public class ClientUpgradeRequest extends UpgradeRequest
return new String(B64Code.encode(bytes));
}
public int getConnectTimeout()
{
return connectTimeout;
}
@Override
public List<HttpCookie> getCookies()
{
if (cookieStore != null)
{
return cookieStore.get(getRequestURI());
}
return super.getCookies();
}
public CookieStore getCookieStore()
{
return cookieStore;
@ -210,6 +217,11 @@ public class ClientUpgradeRequest extends UpgradeRequest
return key;
}
public void setConnectTimeout(int connectTimeout)
{
this.connectTimeout = connectTimeout;
}
public void setCookieStore(CookieStore cookieStore)
{
this.cookieStore = cookieStore;

View File

@ -0,0 +1,77 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import java.net.CookieStore;
import java.net.HttpCookie;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
/**
* An empty implementation of the CookieStore
*/
public class EmptyCookieStore implements CookieStore
{
private final List<HttpCookie> nocookies;
private final List<URI> nouris;
public EmptyCookieStore()
{
nocookies = Collections.unmodifiableList(new ArrayList<HttpCookie>());
nouris = Collections.unmodifiableList(new ArrayList<URI>());
}
@Override
public void add(URI uri, HttpCookie cookie)
{
/* do nothing */
}
@Override
public List<HttpCookie> get(URI uri)
{
return nocookies;
}
@Override
public List<HttpCookie> getCookies()
{
return nocookies;
}
@Override
public List<URI> getURIs()
{
return nouris;
}
@Override
public boolean remove(URI uri, HttpCookie cookie)
{
return false;
}
@Override
public boolean removeAll()
{
return false;
}
}

View File

@ -19,30 +19,315 @@
package org.eclipse.jetty.websocket.client;
import java.io.IOException;
import java.net.CookieStore;
import java.net.SocketAddress;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Locale;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Extension;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.client.internal.ConnectionManager;
import org.eclipse.jetty.websocket.client.masks.Masker;
import org.eclipse.jetty.websocket.client.masks.RandomMasker;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
public interface WebSocketClient
/**
* WebSocketClient provides a means of establishing connections to remote websocket endpoints.
*/
public class WebSocketClient extends ContainerLifeCycle
{
public Future<ClientUpgradeResponse> connect(URI websocketUri) throws IOException;
private static final Logger LOG = Log.getLogger(WebSocketClient.class);
public WebSocketClientFactory getFactory();
private final WebSocketPolicy policy;
private final SslContextFactory sslContextFactory;
private final WebSocketExtensionFactory extensionRegistry;
private final EventDriverFactory eventDriverFactory;
private ByteBufferPool bufferPool;
private Executor executor;
private Scheduler scheduler;
private CookieStore cookieStore;
/** @deprecated move into connection manager */
@Deprecated
private final Queue<WebSocketSession> sessions = new ConcurrentLinkedQueue<>();
private ConnectionManager connectionManager;
public Masker getMasker();
/**
* The abstract WebSocketConnection in use and established for this client.
* <p>
* Note: this is intentionally kept neutral, as WebSocketClient must be able to handle connections that are either physical (a socket connection) or virtual
* (eg: a mux connection).
*
* @deprecated move to connection/session specific place
*/
@Deprecated
private WebSocketConnection connection;
private Masker masker;
private SocketAddress bindAddress;
public WebSocketPolicy getPolicy();
public WebSocketClient()
{
this(null);
}
public ClientUpgradeRequest getUpgradeRequest();
public WebSocketClient(SslContextFactory sslContextFactory)
{
this.sslContextFactory = sslContextFactory;
this.policy = WebSocketPolicy.newClientPolicy();
this.extensionRegistry = new WebSocketExtensionFactory(policy,bufferPool);
this.connectionManager = new ConnectionManager(bufferPool,executor,scheduler,sslContextFactory,policy);
this.masker = new RandomMasker();
this.eventDriverFactory = new EventDriverFactory(policy);
}
public ClientUpgradeResponse getUpgradeResponse();
public Future<Session> connect(Object websocket, URI toUri) throws IOException
{
ClientUpgradeRequest request = new ClientUpgradeRequest(toUri);
request.setRequestURI(toUri);
request.setCookieStore(this.cookieStore);
public EventDriver getWebSocket();
return connect(websocket,toUri,request);
}
public URI getWebSocketUri();
public Future<Session> connect(Object websocket, URI toUri, ClientUpgradeRequest request) throws IOException
{
if (!isStarted())
{
throw new IllegalStateException(WebSocketClient.class.getSimpleName() + "@" + this.hashCode() + " is not started");
}
public void setMasker(Masker masker);
// Validate websocket URI
if (!toUri.isAbsolute())
{
throw new IllegalArgumentException("WebSocket URI must be absolute");
}
if (StringUtil.isBlank(toUri.getScheme()))
{
throw new IllegalArgumentException("WebSocket URI must include a scheme");
}
String scheme = toUri.getScheme().toLowerCase(Locale.ENGLISH);
if (("ws".equals(scheme) == false) && ("wss".equals(scheme) == false))
{
throw new IllegalArgumentException("WebSocket URI scheme only supports [ws] and [wss], not [" + scheme + "]");
}
request.setRequestURI(toUri);
if (request.getCookieStore() == null)
{
request.setCookieStore(this.cookieStore);
}
// Validate websocket URI
LOG.debug("connect websocket:{} to:{}",websocket,toUri);
// Grab Connection Manager
ConnectionManager manager = getConnectionManager();
// Setup Driver for user provided websocket
EventDriver driver = eventDriverFactory.wrap(websocket);
// Create the appropriate (physical vs virtual) connection task
FutureTask<Session> connectTask = manager.connect(this,driver,request);
// Execute the connection on the executor thread
executor.execute(connectTask);
// Return the future
return connectTask;
}
@Override
protected void doStart() throws Exception
{
LOG.debug("Starting {}",this);
if (sslContextFactory != null)
{
addBean(sslContextFactory);
}
String name = WebSocketClient.class.getSimpleName() + "@" + hashCode();
if (executor == null)
{
QueuedThreadPool threadPool = new QueuedThreadPool();
threadPool.setName(name);
executor = threadPool;
}
addBean(executor);
if (bufferPool == null)
{
bufferPool = new MappedByteBufferPool();
}
addBean(bufferPool);
if (scheduler == null)
{
scheduler = new ScheduledExecutorScheduler(name + "-scheduler",false);
}
addBean(scheduler);
if (cookieStore == null)
{
cookieStore = new EmptyCookieStore();
}
this.connectionManager = new ConnectionManager(bufferPool,executor,scheduler,sslContextFactory,policy);
addBean(this.connectionManager);
super.doStart();
LOG.info("Started {}",this);
}
@Override
protected void doStop() throws Exception
{
LOG.debug("Stopping {}",this);
if (cookieStore != null)
{
cookieStore.removeAll();
cookieStore = null;
}
super.doStop();
LOG.info("Stopped {}",this);
}
public SocketAddress getBindAddress()
{
return bindAddress;
}
public ByteBufferPool getBufferPool()
{
return bufferPool;
}
public WebSocketConnection getConnection()
{
return this.connection;
}
public ConnectionManager getConnectionManager()
{
return connectionManager;
}
public Executor getExecutor()
{
return executor;
}
public ExtensionFactory getExtensionFactory()
{
return extensionRegistry;
}
public Masker getMasker()
{
return masker;
}
public WebSocketPolicy getPolicy()
{
return this.policy;
}
public Scheduler getScheduler()
{
return scheduler;
}
/**
* @return the {@link SslContextFactory} that manages TLS encryption
* @see WebSocketClient(SslContextFactory)
*/
public SslContextFactory getSslContextFactory()
{
return sslContextFactory;
}
public List<Extension> initExtensions(List<ExtensionConfig> requested)
{
List<Extension> extensions = new ArrayList<Extension>();
for (ExtensionConfig cfg : requested)
{
Extension extension = extensionRegistry.newInstance(cfg);
if (extension == null)
{
continue;
}
LOG.debug("added {}",extension);
extensions.add(extension);
}
LOG.debug("extensions={}",extensions);
return extensions;
}
public boolean sessionClosed(WebSocketSession session)
{
return isRunning() && sessions.remove(session);
}
public boolean sessionOpened(WebSocketSession session)
{
if (LOG.isDebugEnabled())
{
LOG.debug("Session Opened: {}",session);
}
boolean ret = sessions.offer(session);
session.open();
return ret;
}
public void setBindAdddress(SocketAddress bindAddress)
{
this.bindAddress = bindAddress;
}
public void setBufferPool(ByteBufferPool bufferPool)
{
this.bufferPool = bufferPool;
}
public void setExecutor(Executor executor)
{
this.executor = executor;
}
public void setMasker(Masker masker)
{
this.masker = masker;
}
}

View File

@ -1,241 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.MappedByteBufferPool;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.util.thread.TimerScheduler;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Extension;
import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.ExtensionFactory;
import org.eclipse.jetty.websocket.client.internal.ConnectionManager;
import org.eclipse.jetty.websocket.client.internal.DefaultWebSocketClient;
import org.eclipse.jetty.websocket.client.masks.Masker;
import org.eclipse.jetty.websocket.client.masks.RandomMasker;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriver;
import org.eclipse.jetty.websocket.common.events.EventDriverFactory;
import org.eclipse.jetty.websocket.common.extensions.WebSocketExtensionFactory;
public class WebSocketClientFactory extends ContainerLifeCycle
{
private static final Logger LOG = Log.getLogger(WebSocketClientFactory.class);
private final ByteBufferPool bufferPool = new MappedByteBufferPool();
private final Executor executor;
private final Scheduler scheduler;
private final EventDriverFactory eventDriverFactory;
private final WebSocketPolicy policy;
private final WebSocketExtensionFactory extensionRegistry;
private SocketAddress bindAddress;
private Masker masker;
private final Queue<WebSocketSession> sessions = new ConcurrentLinkedQueue<>();
private ConnectionManager connectionManager;
public WebSocketClientFactory()
{
this(new QueuedThreadPool());
}
public WebSocketClientFactory(Executor threadPool)
{
this(threadPool,new TimerScheduler());
}
public WebSocketClientFactory(Executor threadPool, Scheduler scheduler)
{
this(threadPool,scheduler,null);
}
public WebSocketClientFactory(Executor executor, Scheduler scheduler, SslContextFactory sslContextFactory)
{
LOG.debug("new WebSocketClientFactory()");
if (executor == null)
{
throw new IllegalArgumentException("Executor is required");
}
this.executor = executor;
addBean(executor);
if (scheduler == null)
{
throw new IllegalArgumentException("Scheduler is required");
}
this.scheduler = scheduler;
addBean(scheduler);
if (sslContextFactory != null)
{
addBean(sslContextFactory);
}
this.policy = WebSocketPolicy.newClientPolicy();
this.extensionRegistry = new WebSocketExtensionFactory(policy,bufferPool);
this.connectionManager = new ConnectionManager(bufferPool,executor,scheduler,sslContextFactory,policy);
addBean(this.connectionManager);
this.eventDriverFactory = new EventDriverFactory(policy);
this.masker = new RandomMasker();
}
public WebSocketClientFactory(SslContextFactory sslContextFactory)
{
this(new QueuedThreadPool(),new TimerScheduler(),sslContextFactory);
}
@Override
protected void doStart() throws Exception
{
super.doStart();
LOG.debug("doStart()");
}
@Override
protected void doStop() throws Exception
{
super.doStop();
LOG.debug("doStop()");
}
/**
* The address to bind local physical (outgoing) TCP Sockets to.
*
* @return the address to bind the socket channel to
* @see #setBindAddress(SocketAddress)
*/
public SocketAddress getBindAddress()
{
return bindAddress;
}
public ByteBufferPool getBufferPool()
{
return bufferPool;
}
public ConnectionManager getConnectionManager()
{
return connectionManager;
}
public Executor getExecutor()
{
return executor;
}
public ExtensionFactory getExtensionFactory()
{
return extensionRegistry;
}
/**
*
* @return the masker or null if none is set
*/
public Masker getMasker()
{
return masker;
}
public WebSocketPolicy getPolicy()
{
return policy;
}
public Scheduler getScheduler()
{
return scheduler;
}
public List<Extension> initExtensions(List<ExtensionConfig> requested)
{
List<Extension> extensions = new ArrayList<Extension>();
for (ExtensionConfig cfg : requested)
{
Extension extension = extensionRegistry.newInstance(cfg);
if (extension == null)
{
continue;
}
LOG.debug("added {}",extension);
extensions.add(extension);
}
LOG.debug("extensions={}",extensions);
return extensions;
}
public WebSocketClient newWebSocketClient(Object websocketPojo)
{
LOG.debug("Creating new WebSocket for {}",websocketPojo);
EventDriver websocket = eventDriverFactory.wrap(websocketPojo);
DefaultWebSocketClient client = new DefaultWebSocketClient(this,websocket);
client.setMasker(masker);
return client;
}
public boolean sessionClosed(WebSocketSession session)
{
return isRunning() && sessions.remove(session);
}
public boolean sessionOpened(WebSocketSession session)
{
if (LOG.isDebugEnabled())
{
LOG.debug("Session Opened: {}",session);
}
boolean ret = sessions.offer(session);
session.open();
return ret;
}
/**
* @param bindAddress
* the address to bind the socket channel to
* @see #getBindAddress()
*/
public void setBindAddress(SocketAddress bindAddress)
{
this.bindAddress = bindAddress;
}
public void setMasker(Masker masker)
{
this.masker = masker;
}
}

View File

@ -0,0 +1,72 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client.internal;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.ClientUpgradeResponse;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.masks.Masker;
import org.eclipse.jetty.websocket.common.events.EventDriver;
/**
* Holder for the pending connect information.
*/
public class ConnectPromise extends FuturePromise<Session>
{
private final WebSocketClient client;
private final EventDriver driver;
private final ClientUpgradeRequest request;
private final Masker masker;
private ClientUpgradeResponse response;
public ConnectPromise(WebSocketClient client, EventDriver driver, ClientUpgradeRequest request)
{
this.client = client;
this.driver = driver;
this.request = request;
this.masker = client.getMasker();
}
public WebSocketClient getClient()
{
return client;
}
public EventDriver getDriver()
{
return this.driver;
}
public Masker getMasker()
{
return masker;
}
public ClientUpgradeRequest getRequest()
{
return this.request;
}
public void setResponse(ClientUpgradeResponse response)
{
this.response = response;
}
}

View File

@ -18,7 +18,7 @@
package org.eclipse.jetty.websocket.client.internal;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
@ -27,9 +27,11 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Locale;
import java.util.Queue;
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.component.ContainerLifeCycle;
@ -37,10 +39,13 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.ClientUpgradeResponse;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.internal.io.WebSocketClientSelectorManager;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriver;
/**
* Internal Connection/Client Manager used to track active clients, their physical vs virtual connection information, and provide some means to create new
@ -48,6 +53,56 @@ import org.eclipse.jetty.websocket.client.internal.io.WebSocketClientSelectorMan
*/
public class ConnectionManager extends ContainerLifeCycle
{
private class PhysicalConnect extends ConnectPromise implements Callable<Session>
{
private SocketAddress bindAddress;
public PhysicalConnect(WebSocketClient client, EventDriver driver, ClientUpgradeRequest request)
{
super(client,driver,request);
this.bindAddress = client.getBindAddress();
}
@Override
public Session call() throws Exception
{
SocketChannel channel = SocketChannel.open();
if (bindAddress != null)
{
channel.bind(bindAddress);
}
URI wsUri = getRequest().getRequestURI();
channel.socket().setTcpNoDelay(true); // disable nagle
channel.configureBlocking(false); // async always
InetSocketAddress address = toSocketAddress(wsUri);
LOG.debug("Connect to {}",address);
channel.connect(address);
getSelector().connect(channel,this);
int connectTimeout = getRequest().getConnectTimeout();
return getDriver().awaitActiveSession(connectTimeout,TimeUnit.MILLISECONDS);
}
}
private class VirtualConnect extends ConnectPromise implements Callable<Session>
{
public VirtualConnect(WebSocketClient client, EventDriver driver, ClientUpgradeRequest request)
{
super(client,driver,request);
}
@Override
public Session call() throws Exception
{
// TODO Auto-generated method stub
throw new ConnectException("MUX Not yet supported");
}
}
private static final Logger LOG = Log.getLogger(ConnectionManager.class);
public static InetSocketAddress toSocketAddress(URI uri)
@ -80,7 +135,8 @@ public class ConnectionManager extends ContainerLifeCycle
return new InetSocketAddress(uri.getHost(),port);
}
private final Queue<DefaultWebSocketClient> clients = new ConcurrentLinkedQueue<>();
private final Queue<WebSocketSession> sessions = new ConcurrentLinkedQueue<>();
private final WebSocketClientSelectorManager selector;
public ConnectionManager(ByteBufferPool bufferPool, Executor executor, Scheduler scheduler, SslContextFactory sslContextFactory, WebSocketPolicy policy)
@ -91,73 +147,68 @@ public class ConnectionManager extends ContainerLifeCycle
addBean(selector);
}
public void addClient(DefaultWebSocketClient client)
public void addSession(WebSocketSession session)
{
clients.add(client);
sessions.add(session);
}
private void closeAllConnections()
{
for (DefaultWebSocketClient client : clients)
for (WebSocketSession session : sessions)
{
if (client.getConnection() != null)
if (session.getConnection() != null)
{
try
{
client.getConnection().close();
session.getConnection().close();
}
catch (IOException e)
catch (Throwable t)
{
LOG.debug("During Close All Connections",e);
LOG.debug("During Close All Connections",t);
}
}
}
}
public Future<ClientUpgradeResponse> connectPhysical(DefaultWebSocketClient client) throws IOException
public FutureTask<Session> connect(WebSocketClient client, EventDriver driver, ClientUpgradeRequest request)
{
SocketChannel channel = SocketChannel.open();
SocketAddress bindAddress = client.getFactory().getBindAddress();
if (bindAddress != null)
URI toUri = request.getRequestURI();
String hostname = toUri.getHost();
if (isVirtualConnectionPossibleTo(hostname))
{
channel.bind(bindAddress);
return new FutureTask<Session>(new VirtualConnect(client,driver,request));
}
URI wsUri = client.getWebSocketUri();
channel.socket().setTcpNoDelay(true); // disable nagle
channel.configureBlocking(false); // async allways
InetSocketAddress address = toSocketAddress(wsUri);
LOG.debug("Connect to {}",address);
channel.connect(address);
getSelector().connect(channel,client);
return client;
}
public Future<ClientUpgradeResponse> connectVirtual(WebSocketClient client)
{
// TODO Auto-generated method stub
return null;
return new FutureTask<Session>(new PhysicalConnect(client,driver,request));
}
@Override
protected void doStop() throws Exception
{
closeAllConnections();
clients.clear();
sessions.clear();
super.doStop();
}
public Collection<DefaultWebSocketClient> getClients()
{
return Collections.unmodifiableCollection(clients);
}
public WebSocketClientSelectorManager getSelector()
{
return selector;
}
public Collection<WebSocketSession> getSessions()
{
return Collections.unmodifiableCollection(sessions);
}
private boolean isVirtualConnectionPossibleTo(String hostname)
{
// TODO Auto-generated method stub
return false;
}
public void removeSession(WebSocketSession session)
{
sessions.remove(session);
}
}

View File

@ -1,193 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client.internal;
import java.io.IOException;
import java.net.URI;
import java.util.Locale;
import java.util.concurrent.Future;
import org.eclipse.jetty.util.FuturePromise;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.ClientUpgradeResponse;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
import org.eclipse.jetty.websocket.client.masks.Masker;
import org.eclipse.jetty.websocket.client.masks.RandomMasker;
import org.eclipse.jetty.websocket.common.events.EventDriver;
/**
* WebSocketClient for working with Upgrade (request and response), and establishing connections to the websocket URI of your choice.
*/
public class DefaultWebSocketClient extends FuturePromise<ClientUpgradeResponse> implements WebSocketClient
{
private static final Logger LOG = Log.getLogger(DefaultWebSocketClient.class);
private final WebSocketClientFactory factory;
private final WebSocketPolicy policy;
private final EventDriver websocket;
private URI websocketUri;
/**
* The abstract WebSocketConnection in use and established for this client.
* <p>
* Note: this is intentionally kept neutral, as WebSocketClient must be able to handle connections that are either physical (a socket connection) or virtual
* (eg: a mux connection).
*/
private WebSocketConnection connection;
private ClientUpgradeRequest upgradeRequest;
private ClientUpgradeResponse upgradeResponse;
private Masker masker;
public DefaultWebSocketClient(WebSocketClientFactory factory, EventDriver websocket)
{
this.factory = factory;
LOG.debug("factory.isRunning(): {}",factory.isRunning());
LOG.debug("factory.isStarted(): {}",factory.isStarted());
this.policy = factory.getPolicy();
this.websocket = websocket;
this.upgradeRequest = new ClientUpgradeRequest();
this.masker = new RandomMasker();
}
@Override
public Future<ClientUpgradeResponse> connect(URI websocketUri) throws IOException
{
if (!factory.isStarted())
{
throw new IllegalStateException(WebSocketClientFactory.class.getSimpleName() + " is not started");
}
// Validate websocket URI
if (!websocketUri.isAbsolute())
{
throw new IllegalArgumentException("WebSocket URI must be absolute");
}
if (StringUtil.isBlank(websocketUri.getScheme()))
{
throw new IllegalArgumentException("WebSocket URI must include a scheme");
}
String scheme = websocketUri.getScheme().toLowerCase(Locale.ENGLISH);
if (("ws".equals(scheme) == false) && ("wss".equals(scheme) == false))
{
throw new IllegalArgumentException("WebSocket URI scheme only supports [ws] and [wss], not [" + scheme + "]");
}
this.websocketUri = websocketUri;
// Validate websocket URI
Future<ClientUpgradeResponse> result = null;
LOG.debug("connect({})",websocketUri);
ConnectionManager manager = factory.getConnectionManager();
// Check with factory first for possible alternate connect mechanism (such as mux)
result = manager.connectVirtual(this);
if (result == null)
{
// No such connect option, attempt to use a physical connection
result = manager.connectPhysical(this);
}
return result;
}
@Override
public void failed(Throwable cause)
{
LOG.debug("failed() - {}",cause);
super.failed(cause);
}
protected ClientUpgradeRequest getClientUpgradeRequest()
{
return upgradeRequest;
}
public WebSocketConnection getConnection()
{
return this.connection;
}
@Override
public WebSocketClientFactory getFactory()
{
return factory;
}
@Override
public Masker getMasker()
{
return masker;
}
@Override
public WebSocketPolicy getPolicy()
{
return this.policy;
}
@Override
public ClientUpgradeRequest getUpgradeRequest()
{
return upgradeRequest;
}
@Override
public ClientUpgradeResponse getUpgradeResponse()
{
return upgradeResponse;
}
@Override
public EventDriver getWebSocket()
{
return websocket;
}
@Override
public URI getWebSocketUri()
{
return websocketUri;
}
@Override
public void setMasker(Masker masker)
{
this.masker = masker;
}
public void setUpgradeResponse(ClientUpgradeResponse response)
{
this.upgradeResponse = response;
}
@Override
public void succeeded(ClientUpgradeResponse response)
{
LOG.debug("completed() - {}",response);
super.succeeded(response);
}
}

View File

@ -41,7 +41,7 @@ import org.eclipse.jetty.websocket.api.extensions.ExtensionConfig;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.ClientUpgradeResponse;
import org.eclipse.jetty.websocket.client.internal.DefaultWebSocketClient;
import org.eclipse.jetty.websocket.client.internal.ConnectPromise;
import org.eclipse.jetty.websocket.common.AcceptHash;
import org.eclipse.jetty.websocket.common.WebSocketSession;
import org.eclipse.jetty.websocket.common.events.EventDriver;
@ -58,7 +58,7 @@ public class UpgradeConnection extends AbstractConnection
@Override
public void run()
{
URI uri = client.getWebSocketUri();
URI uri = connectPromise.getRequest().getRequestURI();
request.setRequestURI(uri);
String rawRequest = request.generate();
@ -78,24 +78,24 @@ public class UpgradeConnection extends AbstractConnection
private static final Logger LOG = Log.getLogger(UpgradeConnection.class);
private final ByteBufferPool bufferPool;
private final DefaultWebSocketClient client;
private final ConnectPromise connectPromise;
private final HttpResponseHeaderParser parser;
private ClientUpgradeRequest request;
public UpgradeConnection(EndPoint endp, Executor executor, DefaultWebSocketClient client)
public UpgradeConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise)
{
super(endp,executor);
this.client = client;
this.bufferPool = client.getFactory().getBufferPool();
this.connectPromise = connectPromise;
this.bufferPool = connectPromise.getClient().getBufferPool();
this.parser = new HttpResponseHeaderParser();
try
{
this.request = client.getUpgradeRequest();
this.request = connectPromise.getRequest();
}
catch (ClassCastException e)
{
client.failed(new RuntimeException("Invalid Upgrade Request structure",e));
connectPromise.failed(new RuntimeException("Invalid Upgrade Request structure",e));
}
}
@ -115,7 +115,7 @@ public class UpgradeConnection extends AbstractConnection
private void notifyConnect(ClientUpgradeResponse response)
{
client.succeeded(response);
connectPromise.setResponse(response);
}
@Override
@ -181,7 +181,6 @@ public class UpgradeConnection extends AbstractConnection
if (resp != null)
{
// Got a response!
client.setUpgradeResponse(resp);
validateResponse(resp);
notifyConnect(resp);
upgradeConnection(resp);
@ -192,13 +191,13 @@ public class UpgradeConnection extends AbstractConnection
}
catch (IOException e)
{
client.failed(e);
connectPromise.failed(e);
disconnect(false);
return false;
}
catch (UpgradeException e)
{
client.failed(e);
connectPromise.failed(e);
disconnect(false);
return false;
}
@ -208,11 +207,11 @@ public class UpgradeConnection extends AbstractConnection
{
EndPoint endp = getEndPoint();
Executor executor = getExecutor();
WebSocketClientConnection connection = new WebSocketClientConnection(endp,executor,client);
WebSocketClientConnection connection = new WebSocketClientConnection(endp,executor,connectPromise);
// Initialize / Negotiate Extensions
EventDriver websocket = client.getWebSocket();
WebSocketPolicy policy = client.getPolicy();
EventDriver websocket = connectPromise.getDriver();
WebSocketPolicy policy = connectPromise.getClient().getPolicy();
String acceptedSubProtocol = response.getAcceptedSubProtocol();
WebSocketSession session = new WebSocketSession(request.getRequestURI(),websocket,connection);
@ -220,7 +219,7 @@ public class UpgradeConnection extends AbstractConnection
session.setNegotiatedSubprotocol(acceptedSubProtocol);
connection.setSession(session);
List<Extension> extensions = client.getFactory().initExtensions(response.getExtensions());
List<Extension> extensions = connectPromise.getClient().initExtensions(response.getExtensions());
// Start with default routing.
IncomingFrames incoming = session;

View File

@ -28,8 +28,8 @@ import org.eclipse.jetty.websocket.api.ProtocolException;
import org.eclipse.jetty.websocket.api.WriteCallback;
import org.eclipse.jetty.websocket.api.extensions.Frame;
import org.eclipse.jetty.websocket.api.extensions.IncomingFrames;
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
import org.eclipse.jetty.websocket.client.internal.DefaultWebSocketClient;
import org.eclipse.jetty.websocket.client.internal.ConnectPromise;
import org.eclipse.jetty.websocket.client.internal.ConnectionManager;
import org.eclipse.jetty.websocket.client.masks.Masker;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
@ -40,26 +40,19 @@ import org.eclipse.jetty.websocket.common.io.AbstractWebSocketConnection;
public class WebSocketClientConnection extends AbstractWebSocketConnection
{
private static final Logger LOG = Log.getLogger(WebSocketClientConnection.class);
private final WebSocketClientFactory factory;
private final DefaultWebSocketClient client;
private final ConnectPromise connectPromise;
private final Masker masker;
private boolean connected;
public WebSocketClientConnection(EndPoint endp, Executor executor, DefaultWebSocketClient client)
public WebSocketClientConnection(EndPoint endp, Executor executor, ConnectPromise connectPromise)
{
super(endp,executor,client.getFactory().getScheduler(),client.getPolicy(),client.getFactory().getBufferPool());
this.client = client;
this.factory = client.getFactory();
super(endp,executor,connectPromise.getClient().getScheduler(),connectPromise.getClient().getPolicy(),connectPromise.getClient().getBufferPool());
this.connectPromise = connectPromise;
this.connected = false;
this.masker = client.getMasker();
this.masker = connectPromise.getMasker();
assert (this.masker != null);
}
public DefaultWebSocketClient getClient()
{
return client;
}
@Override
public InetSocketAddress getLocalAddress()
{
@ -76,7 +69,8 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
public void onClose()
{
super.onClose();
factory.sessionClosed(getSession());
ConnectionManager connectionManager = connectPromise.getClient().getConnectionManager();
connectionManager.removeSession(getSession());
}
@Override
@ -84,7 +78,8 @@ public class WebSocketClientConnection extends AbstractWebSocketConnection
{
if (!connected)
{
factory.sessionOpened(getSession());
ConnectionManager connectionManager = connectPromise.getClient().getConnectionManager();
connectionManager.addSession(getSession());
connected = true;
}
super.onOpen();

View File

@ -22,6 +22,7 @@ import java.io.IOException;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.concurrent.Executor;
import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.io.ByteBufferPool;
@ -35,8 +36,8 @@ import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.Scheduler;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
import org.eclipse.jetty.websocket.client.internal.DefaultWebSocketClient;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.internal.ConnectPromise;
public class WebSocketClientSelectorManager extends SelectorManager
{
@ -47,30 +48,32 @@ public class WebSocketClientSelectorManager extends SelectorManager
public WebSocketClientSelectorManager(ByteBufferPool bufferPool, Executor executor, Scheduler scheduler, WebSocketPolicy policy)
{
super(executor, scheduler);
super(executor,scheduler);
this.bufferPool = bufferPool;
this.policy = policy;
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
{
ConnectPromise connect = (ConnectPromise)attachment;
connect.failed(ex);
}
public SslContextFactory getSslContextFactory()
{
return sslContextFactory;
}
public void setSslContextFactory(SslContextFactory sslContextFactory)
{
this.sslContextFactory = sslContextFactory;
}
@Override
public Connection newConnection(final SocketChannel channel, EndPoint endPoint, final Object attachment) throws IOException
{
LOG.debug("newConnection({},{},{})",channel,endPoint,attachment);
DefaultWebSocketClient client = (DefaultWebSocketClient)attachment;
ConnectPromise connectPromise = (ConnectPromise)attachment;
try
{
String scheme = client.getWebSocketUri().getScheme();
String scheme = connectPromise.getRequest().getRequestURI().getScheme();
if ("wss".equalsIgnoreCase(scheme))
{
@ -82,24 +85,26 @@ public class WebSocketClientSelectorManager extends SelectorManager
SslConnection sslConnection = new SslConnection(bufferPool,getExecutor(),endPoint,engine);
EndPoint sslEndPoint = sslConnection.getDecryptedEndPoint();
Connection connection = newUpgradeConnection(channel,sslEndPoint,client);
Connection connection = newUpgradeConnection(channel,sslEndPoint,connectPromise);
sslEndPoint.setConnection(connection);
connectionOpened(connection);
return sslConnection;
}
else
{
throw new IOException("Cannot init SSL");
}
}
else
{
// Standard "ws://"
return newUpgradeConnection(channel,endPoint,client);
return newUpgradeConnection(channel,endPoint,connectPromise);
}
}
catch (IOException e)
{
LOG.debug(e);
client.failed(e);
connectPromise.failed(e);
// rethrow
throw e;
}
@ -121,21 +126,16 @@ public class WebSocketClientSelectorManager extends SelectorManager
return engine;
}
public UpgradeConnection newUpgradeConnection(SocketChannel channel, EndPoint endPoint, DefaultWebSocketClient client)
public UpgradeConnection newUpgradeConnection(SocketChannel channel, EndPoint endPoint, ConnectPromise connectPromise)
{
WebSocketClientFactory factory = client.getFactory();
Executor executor = factory.getExecutor();
UpgradeConnection connection = new UpgradeConnection(endPoint,executor,client);
// track the client
factory.getConnectionManager().addClient(client);
WebSocketClient client = connectPromise.getClient();
Executor executor = client.getExecutor();
UpgradeConnection connection = new UpgradeConnection(endPoint,executor,connectPromise);
return connection;
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
public void setSslContextFactory(SslContextFactory sslContextFactory)
{
DefaultWebSocketClient client = (DefaultWebSocketClient)attachment;
client.failed(ex);
this.sslContextFactory = sslContextFactory;
}
}

View File

@ -25,6 +25,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
@ -47,14 +48,14 @@ public class BadNetworkTest
public TestTracker tt = new TestTracker();
private BlockheadServer server;
private WebSocketClientFactory factory;
private WebSocketClient client;
@Before
public void startFactory() throws Exception
public void startClient() throws Exception
{
factory = new WebSocketClientFactory();
factory.getPolicy().setIdleTimeout(250);
factory.start();
client = new WebSocketClient();
client.getPolicy().setIdleTimeout(250);
client.start();
}
@Before
@ -65,9 +66,9 @@ public class BadNetworkTest
}
@After
public void stopFactory() throws Exception
public void stopClient() throws Exception
{
factory.stop();
client.stop();
}
@After
@ -80,10 +81,9 @@ public class BadNetworkTest
public void testAbruptClientClose() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<ClientUpgradeResponse> future = client.connect(wsUri);
Future<Session> future = client.connect(wsocket,wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
@ -111,10 +111,9 @@ public class BadNetworkTest
public void testAbruptServerClose() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<ClientUpgradeResponse> future = client.connect(wsUri);
Future<Session> future = client.connect(wsocket,wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();

View File

@ -27,6 +27,7 @@ import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
@ -46,13 +47,13 @@ public class ClientConnectTest
public TestTracker tt = new TestTracker();
private BlockheadServer server;
private WebSocketClientFactory factory;
private WebSocketClient client;
@Before
public void startFactory() throws Exception
public void startClient() throws Exception
{
factory = new WebSocketClientFactory();
factory.start();
client = new WebSocketClient();
client.start();
}
@Before
@ -63,9 +64,9 @@ public class ClientConnectTest
}
@After
public void stopFactory() throws Exception
public void stopClient() throws Exception
{
factory.stop();
client.stop();
}
@After
@ -78,10 +79,9 @@ public class ClientConnectTest
public void testBadHandshake() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<ClientUpgradeResponse> future = client.connect(wsUri);
Future<Session> future = client.connect(wsocket,wsUri);
ServerConnection connection = server.accept();
connection.readRequest();
@ -105,10 +105,9 @@ public class ClientConnectTest
public void testBadUpgrade() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<ClientUpgradeResponse> future = client.connect(wsUri);
Future<Session> future = client.connect(wsocket,wsUri);
ServerConnection connection = server.accept();
connection.readRequest();
@ -132,10 +131,9 @@ public class ClientConnectTest
public void testConnectionNotAccepted() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<ClientUpgradeResponse> future = client.connect(wsUri);
Future<Session> future = client.connect(wsocket,wsUri);
// Intentionally not accept incoming socket.
// server.accept();
@ -157,11 +155,10 @@ public class ClientConnectTest
public void testConnectionRefused() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
// Intentionally bad port
URI wsUri = new URI("ws://127.0.0.1:1");
Future<ClientUpgradeResponse> future = client.connect(wsUri);
Future<Session> future = client.connect(wsocket,wsUri);
// The attempt to get upgrade response future should throw error
try
@ -180,10 +177,9 @@ public class ClientConnectTest
public void testConnectionTimeout() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<ClientUpgradeResponse> future = client.connect(wsUri);
Future<Session> future = client.connect(wsocket,wsUri);
ServerConnection ssocket = server.accept();
Assert.assertNotNull(ssocket);

View File

@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
@ -41,14 +42,14 @@ public class SlowClientTest
public TestTracker tt = new TestTracker();
private BlockheadServer server;
private WebSocketClientFactory factory;
private WebSocketClient client;
@Before
public void startFactory() throws Exception
public void startClient() throws Exception
{
factory = new WebSocketClientFactory();
factory.getPolicy().setIdleTimeout(60000);
factory.start();
client = new WebSocketClient();
client.getPolicy().setIdleTimeout(60000);
client.start();
}
@Before
@ -59,9 +60,9 @@ public class SlowClientTest
}
@After
public void stopFactory() throws Exception
public void stopClient() throws Exception
{
factory.stop();
client.stop();
}
@After
@ -75,11 +76,10 @@ public class SlowClientTest
public void testClientSlowToSend() throws Exception
{
TrackingSocket tsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(tsocket);
client.getPolicy().setIdleTimeout(60000);
URI wsUri = server.getWsUri();
Future<ClientUpgradeResponse> future = client.connect(wsUri);
Future<Session> future = client.connect(tsocket,wsUri);
ServerConnection sconnection = server.accept();
sconnection.setSoTimeout(60000);

View File

@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.toolchain.test.annotation.Slow;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
@ -42,14 +43,14 @@ public class SlowServerTest
public TestTracker tt = new TestTracker();
private BlockheadServer server;
private WebSocketClientFactory factory;
private WebSocketClient client;
@Before
public void startFactory() throws Exception
public void startClient() throws Exception
{
factory = new WebSocketClientFactory();
factory.getPolicy().setIdleTimeout(60000);
factory.start();
client = new WebSocketClient();
client.getPolicy().setIdleTimeout(60000);
client.start();
}
@Before
@ -60,9 +61,9 @@ public class SlowServerTest
}
@After
public void stopFactory() throws Exception
public void stopClient() throws Exception
{
factory.stop();
client.stop();
}
@After
@ -76,12 +77,11 @@ public class SlowServerTest
public void testServerSlowToRead() throws Exception
{
TrackingSocket tsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(tsocket);
client.setMasker(new ZeroMasker());
client.getPolicy().setIdleTimeout(60000);
URI wsUri = server.getWsUri();
Future<ClientUpgradeResponse> future = client.connect(wsUri);
Future<Session> future = client.connect(tsocket,wsUri);
ServerConnection sconnection = server.accept();
sconnection.setSoTimeout(60000);
@ -127,12 +127,11 @@ public class SlowServerTest
// final Exchanger<String> exchanger = new Exchanger<String>();
TrackingSocket tsocket = new TrackingSocket();
// tsocket.messageExchanger = exchanger;
WebSocketClient client = factory.newWebSocketClient(tsocket);
client.setMasker(new ZeroMasker());
client.getPolicy().setIdleTimeout(60000);
URI wsUri = server.getWsUri();
Future<ClientUpgradeResponse> future = client.connect(wsUri);
Future<Session> future = client.connect(tsocket,wsUri);
ServerConnection sconnection = server.accept();
sconnection.setSoTimeout(60000);

View File

@ -25,6 +25,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
@ -45,14 +46,14 @@ public class TimeoutTest
public TestTracker tt = new TestTracker();
private BlockheadServer server;
private WebSocketClientFactory factory;
private WebSocketClient client;
@Before
public void startFactory() throws Exception
public void startClient() throws Exception
{
factory = new WebSocketClientFactory();
factory.getPolicy().setIdleTimeout(250); // idle timeout (for all tests here)
factory.start();
client = new WebSocketClient();
client.getPolicy().setIdleTimeout(250); // idle timeout (for all tests here)
client.start();
}
@Before
@ -63,9 +64,9 @@ public class TimeoutTest
}
@After
public void stopFactory() throws Exception
public void stopClient() throws Exception
{
factory.stop();
client.stop();
}
@After
@ -75,7 +76,7 @@ public class TimeoutTest
}
/**
* In a situation where the upgrade/connection is successfull, and there is no activity for a while, the idle timeout triggers on the client side and
* In a situation where the upgrade/connection is successful, and there is no activity for a while, the idle timeout triggers on the client side and
* automatically initiates a close handshake.
*/
@Test
@ -83,10 +84,8 @@ public class TimeoutTest
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<ClientUpgradeResponse> future = client.connect(wsUri);
Future<Session> future = client.connect(wsocket,wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();

View File

@ -59,7 +59,7 @@ public class WebSocketClientBadUriTest
@Rule
public TestTracker tt = new TestTracker();
private WebSocketClientFactory factory;
private WebSocketClient client;
private final String uriStr;
private final URI uri;
@ -70,27 +70,26 @@ public class WebSocketClientBadUriTest
}
@Before
public void startFactory() throws Exception
public void startClient() throws Exception
{
factory = new WebSocketClientFactory();
factory.start();
client = new WebSocketClient();
client.start();
}
@After
public void stopFactory() throws Exception
public void stopClient() throws Exception
{
factory.stop();
client.stop();
}
@Test
public void testBadURI() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
try
{
client.connect(uri); // should toss exception
client.connect(wsocket,uri); // should toss exception
Assert.fail("Expected IllegalArgumentException");
}

View File

@ -1,48 +0,0 @@
//
// ========================================================================
// Copyright (c) 1995-2013 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import org.eclipse.jetty.toolchain.test.TestTracker;
import org.eclipse.jetty.websocket.client.masks.ZeroMasker;
import org.junit.Assert;
import org.junit.Rule;
import org.junit.Test;
public class WebSocketClientFactoryTest
{
@Rule
public TestTracker tt = new TestTracker();
@Test
public void testNewSocket()
{
WebSocketClientFactory factory = new WebSocketClientFactory();
factory.setMasker(new ZeroMasker());
WebSocketClient client = factory.newWebSocketClient(new TrackingSocket());
Assert.assertThat("Client",client,notNullValue());
Assert.assertThat("Client.factory",client.getFactory(),is(factory));
Assert.assertThat("Client.policy",client.getPolicy(),is(factory.getPolicy()));
Assert.assertThat("Client.masker", client.getMasker(), notNullValue());
Assert.assertThat("Client.upgradeRequest",client.getUpgradeRequest(),notNullValue());
Assert.assertThat("Client.upgradeResponse",client.getUpgradeResponse(),nullValue());
}
}

View File

@ -26,8 +26,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.toolchain.test.AdvancedRunner;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.eclipse.jetty.websocket.common.WebSocketFrame;
@ -41,13 +40,13 @@ import org.junit.runner.RunWith;
public class WebSocketClientTest
{
private BlockheadServer server;
private WebSocketClientFactory factory;
private WebSocketClient client;
@Before
public void startFactory() throws Exception
public void startClient() throws Exception
{
factory = new WebSocketClientFactory();
factory.start();
client = new WebSocketClient();
client.start();
}
@Before
@ -58,9 +57,9 @@ public class WebSocketClientTest
}
@After
public void stopFactory() throws Exception
public void stopClient() throws Exception
{
factory.stop();
client.stop();
}
@After
@ -74,25 +73,24 @@ public class WebSocketClientTest
{
TrackingSocket cliSock = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(cliSock);
client.getPolicy().setIdleTimeout(10000);
URI wsUri = server.getWsUri();
UpgradeRequest request = client.getUpgradeRequest();
ClientUpgradeRequest request = new ClientUpgradeRequest();
request.setSubProtocols("echo");
Future<ClientUpgradeResponse> future = client.connect(wsUri);
Future<Session> future = client.connect(cliSock,wsUri,request);
final ServerConnection srvSock = server.accept();
srvSock.upgrade();
UpgradeResponse resp = future.get(500,TimeUnit.MILLISECONDS);
Assert.assertThat("Response",resp,notNullValue());
Assert.assertThat("Response.success",resp.isSuccess(),is(true));
Session sess = future.get(500,TimeUnit.MILLISECONDS);
Assert.assertThat("Session",sess,notNullValue());
Assert.assertThat("Session.active",sess.isActive(),is(true));
cliSock.assertWasOpened();
cliSock.assertNotClosed();
Assert.assertThat("Factory.sockets.size",factory.getConnectionManager().getClients().size(),is(1));
Assert.assertThat("client.connectionManager.sessions.size",client.getConnectionManager().getSessions().size(),is(1));
cliSock.getConnection().write("Hello World!");
srvSock.echoMessage(1,TimeUnit.MILLISECONDS,500);
@ -106,17 +104,16 @@ public class WebSocketClientTest
public void testBasicEcho_FromServer() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
Future<ClientUpgradeResponse> future = client.connect(server.getWsUri());
Future<Session> future = client.connect(wsocket,server.getWsUri());
// Server
final ServerConnection srvSock = server.accept();
srvSock.upgrade();
// Validate connect
UpgradeResponse resp = future.get(500,TimeUnit.MILLISECONDS);
Assert.assertThat("Response",resp,notNullValue());
Assert.assertThat("Response.success",resp.isSuccess(),is(true));
Session sess = future.get(500,TimeUnit.MILLISECONDS);
Assert.assertThat("Session",sess,notNullValue());
Assert.assertThat("Session.active",sess.isActive(),is(true));
// Have server send initial message
srvSock.write(WebSocketFrame.text("Hello World"));
@ -132,15 +129,14 @@ public class WebSocketClientTest
@Test
public void testLocalRemoteAddress() throws Exception
{
WebSocketClientFactory fact = new WebSocketClientFactory();
WebSocketClient fact = new WebSocketClient();
fact.start();
try
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = fact.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<ClientUpgradeResponse> future = client.connect(wsUri);
Future<Session> future = client.connect(wsocket,wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
@ -171,17 +167,16 @@ public class WebSocketClientTest
@Test
public void testMessageBiggerThanBufferSize() throws Exception
{
WebSocketClientFactory factSmall = new WebSocketClientFactory();
WebSocketClient factSmall = new WebSocketClient();
factSmall.start();
try
{
int bufferSize = 512;
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factSmall.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<ClientUpgradeResponse> future = client.connect(wsUri);
Future<Session> future = client.connect(wsocket,wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();

View File

@ -29,8 +29,8 @@ import org.eclipse.jetty.websocket.api.annotations.OnWebSocketClose;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.api.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.api.annotations.WebSocket;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
/**
* Example of a simple Echo Client.
@ -99,16 +99,17 @@ public class SimpleEchoClient
destUri = args[0];
}
WebSocketClientFactory factory = new WebSocketClientFactory();
WebSocketClient client = new WebSocketClient();
SimpleEchoSocket socket = new SimpleEchoSocket();
try
{
factory.start();
WebSocketClient client = factory.newWebSocketClient(socket);
client.start();
URI echoUri = new URI(destUri);
ClientUpgradeRequest request = new ClientUpgradeRequest();
request.addExtensions("x-webkit-deflate-frame");
client.connect(socket,echoUri,request);
System.out.printf("Connecting to : %s%n",echoUri);
client.getUpgradeRequest().addExtensions("x-webkit-deflate-frame");
client.connect(echoUri);
// wait for closed socket connection.
socket.awaitClose(5,TimeUnit.SECONDS);
@ -121,7 +122,7 @@ public class SimpleEchoClient
{
try
{
factory.stop();
client.stop();
}
catch (Exception e)
{

View File

@ -31,8 +31,8 @@ import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.client.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
import org.eclipse.jetty.websocket.common.OpCode;
/**
@ -177,17 +177,16 @@ public class TestClient
}
TestClient[] client = new TestClient[clients];
WebSocketClientFactory factory = new WebSocketClientFactory();
WebSocketClient wsclient = new WebSocketClient();
try
{
wsclient.start();
__start = System.currentTimeMillis();
protocol = protocol == null?"echo":protocol;
for (int i = 0; i < clients; i++)
{
client[i] = new TestClient(factory,host,port,protocol,60000);
client[i] = new TestClient(wsclient,host,port,protocol,60000);
client[i].open();
}
@ -254,7 +253,7 @@ public class TestClient
System.out.printf("rtt min/ave/max = %.3f/%.3f/%.3f ms\n",__minDuration.get() / 1000000.0,__messagesReceived.get() == 0?0.0:(__totalTime.get()
/ __messagesReceived.get() / 1000000.0),__maxDuration.get() / 1000000.0);
factory.stop();
wsclient.stop();
}
}
@ -280,12 +279,12 @@ public class TestClient
int _messageBytes;
int _frames;
byte _opcode = -1;
private WebSocketClientFactory factory;
private WebSocketClient client;
private TestSocket socket;
public TestClient(WebSocketClientFactory factory, String host, int port, String protocol, int timeoutMS) throws Exception
public TestClient(WebSocketClient client, String host, int port, String protocol, int timeoutMS) throws Exception
{
this.factory = factory;
this.client = client;
_host = host;
_port = port;
_protocol = protocol;
@ -295,17 +294,16 @@ public class TestClient
private void disconnect()
{
// TODO Auto-generated method stub
}
private void open() throws Exception
{
WebSocketClient client = factory.newWebSocketClient(socket);
client.getPolicy().setIdleTimeout(_timeout);
client.getUpgradeRequest().setSubProtocols(_protocol);
ClientUpgradeRequest request = new ClientUpgradeRequest();
request.setSubProtocols(_protocol);
socket = new TestSocket();
URI wsUri = new URI("ws://" + _host + ":" + _port + "/");
client.connect(wsUri).get(10,TimeUnit.SECONDS);
client.connect(socket,wsUri,request).get(10,TimeUnit.SECONDS);
}
private void send(byte op, byte[] data, int fragment)

View File

@ -41,6 +41,8 @@ import org.eclipse.jetty.websocket.api.CloseStatus;
import org.eclipse.jetty.websocket.api.RemoteEndpoint;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.SuspendToken;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
@ -244,6 +246,20 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
return timeout;
}
@Override
public UpgradeRequest getUpgradeRequest()
{
// TODO Auto-generated method stub
return null;
}
@Override
public UpgradeResponse getUpgradeResponse()
{
// TODO Auto-generated method stub
return null;
}
/**
* Incoming Errors from Parser
*/
@ -310,8 +326,7 @@ public class WebSocketSession extends ContainerLifeCycle implements Session, Web
this.active = true;
// Open WebSocket
websocket.setSession(this);
websocket.onConnect();
websocket.openSession(this);
if (LOG.isDebugEnabled())
{

View File

@ -20,6 +20,8 @@ package org.eclipse.jetty.websocket.common.events;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
@ -27,7 +29,9 @@ import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.CloseException;
import org.eclipse.jetty.websocket.api.Session;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.extensions.Frame;
@ -45,6 +49,7 @@ public abstract class EventDriver implements IncomingFrames
protected final Logger LOG;
protected final WebSocketPolicy policy;
protected final Object websocket;
protected final CountDownLatch sessionOpenLatch = new CountDownLatch(1);
protected WebSocketSession session;
public EventDriver(WebSocketPolicy policy, Object websocket)
@ -54,6 +59,16 @@ public abstract class EventDriver implements IncomingFrames
this.LOG = Log.getLogger(websocket.getClass());
}
public Session awaitActiveSession(int timeout, TimeUnit unit) throws InterruptedException, IOException
{
if (sessionOpenLatch.await(timeout,unit))
{
return this.session;
}
// TODO: determine if we should invalidate the session in this case?
throw new UpgradeException("Timed out");
}
public WebSocketPolicy getPolicy()
{
return policy;
@ -169,9 +184,11 @@ public abstract class EventDriver implements IncomingFrames
public abstract void onTextMessage(String message);
public void setSession(WebSocketSession session)
public void openSession(WebSocketSession session)
{
this.session = session;
this.onConnect();
sessionOpenLatch.countDown();
}
protected void terminateConnection(int statusCode, String rawreason)