Fixing websocket-client upgrade connection -> websocket connection handling

This commit is contained in:
Joakim Erdfelt 2012-08-17 13:48:04 -07:00
parent ffd86fd21b
commit b36b41afd7
29 changed files with 1771 additions and 452 deletions

View File

@ -16,157 +16,30 @@
package org.eclipse.jetty.websocket.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.nio.channels.SocketChannel;
import java.util.Map;
import java.util.concurrent.Future;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.driver.WebSocketEventDriver;
public class WebSocketClient
public interface WebSocketClient
{
public static class ConnectFuture extends FutureCallback<WebSocketConnection>
{
private static final Logger LOG = Log.getLogger(ConnectFuture.class);
private final WebSocketClient client;
private final URI websocketUri;
private final WebSocketEventDriver websocket;
public FutureCallback<UpgradeResponse> connect(URI websocketUri) throws IOException;
public ConnectFuture(WebSocketClient client, URI websocketUri, WebSocketEventDriver websocket)
{
this.client = client;
this.websocketUri = websocketUri;
this.websocket = websocket;
}
public WebSocketConnection getConnection();
@Override
public void completed(WebSocketConnection context)
{
LOG.debug("completed() - {}",context);
// TODO Auto-generated method stub
super.completed(context);
}
public WebSocketClientFactory getFactory();
@Override
public void failed(WebSocketConnection context, Throwable cause)
{
LOG.debug("failed() - {}, {}",context,cause);
LOG.info(cause);
// TODO Auto-generated method stub
super.failed(context,cause);
}
public WebSocketPolicy getPolicy();
public WebSocketClient getClient()
{
return client;
}
public UpgradeRequest getUpgradeRequest();
public Map<String, String> getCookies()
{
// TODO Auto-generated method stub
return null;
}
public UpgradeResponse getUpgradeResponse();
public WebSocketClientFactory getFactory()
{
return client.factory;
}
public WebSocketEventDriver getWebSocket();
public String getOrigin()
{
// TODO Auto-generated method stub
return null;
}
public WebSocketEventDriver getWebSocket()
{
return websocket;
}
public URI getWebSocketUri()
{
return websocketUri;
}
}
private static final Logger LOG = Log.getLogger(WebSocketClient.class);
public static InetSocketAddress toSocketAddress(URI uri)
{
return new InetSocketAddress(uri.getHost(),uri.getPort());
}
private final WebSocketClientFactory factory;
private SocketAddress bindAddress;
private WebSocketPolicy policy;
public WebSocketClient(WebSocketClientFactory factory)
{
this.factory = factory;
this.policy = WebSocketPolicy.newClientPolicy();
}
public Future<WebSocketConnection> connect(URI websocketUri, Object websocketPojo) throws IOException
{
if (!factory.isStarted())
{
throw new IllegalStateException(WebSocketClientFactory.class.getSimpleName() + " is not started");
}
SocketChannel channel = SocketChannel.open();
if (bindAddress != null)
{
channel.bind(bindAddress);
}
channel.socket().setTcpNoDelay(true); // disable nagle
channel.configureBlocking(false); // async all the way
InetSocketAddress address = toSocketAddress(websocketUri);
LOG.debug("Connect to {}",address);
WebSocketEventDriver websocket = this.factory.newWebSocketDriver(websocketPojo);
ConnectFuture result = new ConnectFuture(this,websocketUri,websocket);
channel.connect(address);
factory.getSelector().connect(channel,result);
return result;
}
/**
* @return the address to bind the socket channel to
* @see #setBindAddress(SocketAddress)
*/
public SocketAddress getBindAddress()
{
return bindAddress;
}
public WebSocketPolicy getPolicy()
{
return this.policy;
}
/**
* @param bindAddress
* the address to bind the socket channel to
* @see #getBindAddress()
*/
public void setBindAddress(SocketAddress bindAddress)
{
this.bindAddress = bindAddress;
}
public void setProtocol(String protocol)
{
// TODO Auto-generated method stub
}
}
public URI getWebSocketUri();
}

View File

@ -15,38 +15,39 @@
//========================================================================
package org.eclipse.jetty.websocket.client;
import java.util.Collection;
import java.util.Collections;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.net.SocketAddress;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.StandardByteBufferPool;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.ExtensionRegistry;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.io.WebSocketClientSelectorManager;
import org.eclipse.jetty.websocket.client.internal.ConnectionManager;
import org.eclipse.jetty.websocket.client.internal.IWebSocketClient;
import org.eclipse.jetty.websocket.driver.EventMethodsCache;
import org.eclipse.jetty.websocket.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.extensions.WebSocketExtensionRegistry;
public class WebSocketClientFactory extends AggregateLifeCycle
{
private static final Logger LOG = Log.getLogger(WebSocketClientFactory.class);
private final Queue<WebSocketConnection> connections = new ConcurrentLinkedQueue<>();
private final ByteBufferPool bufferPool = new StandardByteBufferPool();
private final Executor executor;
private final ScheduledExecutorService scheduler;
private final WebSocketClientSelectorManager selector;
private final EventMethodsCache methodsCache;
private final WebSocketPolicy policy;
private final ExtensionRegistry extensionRegistry;
private SocketAddress bindAddress;
private ConnectionManager connectionManager;
public WebSocketClientFactory()
{
@ -84,10 +85,10 @@ public class WebSocketClientFactory extends AggregateLifeCycle
}
this.policy = WebSocketPolicy.newClientPolicy();
this.extensionRegistry = new WebSocketExtensionRegistry(policy,bufferPool);
selector = new WebSocketClientSelectorManager(bufferPool,executor,scheduler,policy);
selector.setSslContextFactory(sslContextFactory);
addBean(selector);
this.connectionManager = new ConnectionManager(bufferPool,executor,scheduler,sslContextFactory,policy);
addBean(this.connectionManager);
this.methodsCache = new EventMethodsCache();
}
@ -97,20 +98,15 @@ public class WebSocketClientFactory extends AggregateLifeCycle
this(new QueuedThreadPool(),Executors.newSingleThreadScheduledExecutor(),sslContextFactory);
}
private void closeConnections()
/**
* The address to bind local physical (outgoing) TCP Sockets to.
*
* @return the address to bind the socket channel to
* @see #setBindAddress(SocketAddress)
*/
public SocketAddress getBindAddress()
{
for (WebSocketConnection connection : connections)
{
connection.close();
}
connections.clear();
}
@Override
protected void doStop() throws Exception
{
closeConnections();
super.doStop();
return bindAddress;
}
public ByteBufferPool getBufferPool()
@ -118,9 +114,9 @@ public class WebSocketClientFactory extends AggregateLifeCycle
return bufferPool;
}
protected Collection<WebSocketConnection> getConnections()
public ConnectionManager getConnectionManager()
{
return Collections.unmodifiableCollection(connections);
return connectionManager;
}
public Executor getExecutor()
@ -128,6 +124,11 @@ public class WebSocketClientFactory extends AggregateLifeCycle
return executor;
}
public ExtensionRegistry getExtensionRegistry()
{
return extensionRegistry;
}
public WebSocketPolicy getPolicy()
{
return policy;
@ -138,18 +139,20 @@ public class WebSocketClientFactory extends AggregateLifeCycle
return scheduler;
}
public SelectorManager getSelector()
public WebSocketClient newWebSocketClient(Object websocketPojo)
{
return selector;
LOG.debug("Creating new WebSocket for {}",websocketPojo);
WebSocketEventDriver websocket = new WebSocketEventDriver(websocketPojo,methodsCache,policy,getBufferPool());
return new IWebSocketClient(this,websocket);
}
public WebSocketClient newWebSocketClient()
/**
* @param bindAddress
* the address to bind the socket channel to
* @see #getBindAddress()
*/
public void setBindAddress(SocketAddress bindAddress)
{
return new WebSocketClient(this);
}
protected WebSocketEventDriver newWebSocketDriver(Object websocketPojo)
{
return new WebSocketEventDriver(websocketPojo,methodsCache,policy,getBufferPool());
this.bindAddress = bindAddress;
}
}

View File

@ -13,55 +13,36 @@
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.client.io;
package org.eclipse.jetty.websocket.client.internal;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.WebSocketClient.ConnectFuture;
import org.eclipse.jetty.websocket.io.AbstractWebSocketConnection;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
/**
* Default Handshake Connection.
* <p>
* Results in a {@link AbstractWebSocketConnection} on successful handshake.
* Allowing a generate from a UpgradeRequest
*/
public class HandshakeConnection extends AbstractConnection implements Connection
public class ClientUpgradeRequest implements UpgradeRequest
{
public static final String COOKIE_DELIM = "\"\\\n\r\t\f\b%+ ;=";
private final WebSocketClient.ConnectFuture future;
private final ByteBufferPool bufferPool;
private final String key;
private String key;
public HandshakeConnection(EndPoint endp, Executor executor, ByteBufferPool bufferPool, WebSocketClient.ConnectFuture future)
public ClientUpgradeRequest()
{
super(endp,executor);
this.future = future;
this.bufferPool = bufferPool;
byte[] bytes = new byte[16];
new Random().nextBytes(bytes);
this.key = new String(B64Code.encode(bytes));
}
public void handshake() throws InterruptedException, ExecutionException
public String generate(URI uri)
{
URI uri = future.getWebSocketUri();
StringBuilder request = new StringBuilder(512);
request.append("GET ");
if (StringUtil.isBlank(uri.getPath()))
@ -88,14 +69,14 @@ public class HandshakeConnection extends AbstractConnection implements Connectio
request.append("Connection: Upgrade\r\n");
request.append("Sec-WebSocket-Key: ").append(key).append("\r\n");
if (StringUtil.isNotBlank(future.getOrigin()))
if (StringUtil.isNotBlank(getOrigin()))
{
request.append("Origin: ").append(future.getOrigin()).append("\r\n");
request.append("Origin: ").append(getOrigin()).append("\r\n");
}
request.append("Sec-WebSocket-Version: 13\r\n"); // RFC-6455 specified version
Map<String, String> cookies = future.getCookies();
Map<String, String> cookies = getCookieMap();
if ((cookies != null) && (cookies.size() > 0))
{
for (String cookie : cookies.keySet())
@ -109,28 +90,82 @@ public class HandshakeConnection extends AbstractConnection implements Connectio
}
request.append("\r\n");
// TODO: extensions
// TODO: write to connection
byte rawreq[] = StringUtil.getUtf8Bytes(request.toString());
ByteBuffer buf = bufferPool.acquire(rawreq.length,false);
try
{
FutureCallback<ConnectFuture> callback = new FutureCallback<>();
getEndPoint().write(future,callback,buf);
// TODO: read response & upgrade via async callback
callback.get(); // TODO: block on read?
}
finally
{
bufferPool.release(buf);
}
return request.toString();
}
@Override
public void onFillable()
public Map<String, String> getCookieMap()
{
// TODO Auto-generated method stub
return null;
}
@Override
public List<ExtensionConfig> getExtensions()
{
// TODO Auto-generated method stub
return null;
}
@Override
public String getHeader(String name)
{
// TODO Auto-generated method stub
return null;
}
@Override
public String getHost()
{
// TODO Auto-generated method stub
return null;
}
@Override
public String getHttpEndPointName()
{
// TODO Auto-generated method stub
return null;
}
public String getKey()
{
return key;
}
@Override
public String getOrigin()
{
// TODO Auto-generated method stub
return null;
}
@Override
public List<String> getSubProtocols()
{
// TODO Auto-generated method stub
return null;
}
@Override
public boolean hasSubProtocol(String test)
{
// TODO Auto-generated method stub
return false;
}
@Override
public boolean isOrigin(String test)
{
// TODO Auto-generated method stub
return false;
}
@Override
public void setSubProtocols(String string)
{
// TODO Auto-generated method stub
}
}

View File

@ -0,0 +1,157 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.client.internal;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.eclipse.jetty.util.MultiMap;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
public class ClientUpgradeResponse implements UpgradeResponse
{
public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";
private int statusCode;
private String statusReason;
private MultiMap<String> headers;
private List<ExtensionConfig> extensions;
private boolean success = false;
public ClientUpgradeResponse()
{
headers = new MultiMap<>();
}
@Override
public void addHeader(String name, String value)
{
headers.add(name.toLowerCase(),value);
}
@Override
public String getAcceptedSubProtocol()
{
return headers.getValue(SEC_WEBSOCKET_PROTOCOL,0);
}
@Override
public List<ExtensionConfig> getExtensions()
{
return extensions;
}
@Override
public Set<String> getHeaderNamesSet()
{
return headers.keySet();
}
@Override
public String getHeaderValue(String name)
{
return headers.getValue(name.toLowerCase(),0);
}
@Override
public Iterator<String> getHeaderValues(String name)
{
List<String> values = headers.getValues(name);
if (values == null)
{
return Collections.emptyIterator();
}
return values.iterator();
}
@Override
public int getStatusCode()
{
return statusCode;
}
@Override
public String getStatusReason()
{
return statusReason;
}
@Override
public boolean isSuccess()
{
return success;
}
@Override
public void sendForbidden(String message) throws IOException
{
throw new UnsupportedOperationException("Not supported on client implementation");
}
@Override
public void setAcceptedSubProtocol(String protocol)
{
headers.put(SEC_WEBSOCKET_PROTOCOL,protocol);
}
@Override
public void setExtensions(List<ExtensionConfig> extensions)
{
if (this.extensions == null)
{
this.extensions = new ArrayList<>();
}
else
{
this.extensions.clear();
}
this.extensions.addAll(extensions);
}
@Override
public void setHeader(String name, String value)
{
headers.putValues(name,value);
}
public void setStatusCode(int statusCode)
{
this.statusCode = statusCode;
}
public void setStatusReason(String statusReason)
{
this.statusReason = statusReason;
}
@Override
public void validateWebSocketHash(String expectedHash) throws UpgradeException
{
String respHash = getHeaderValue("Sec-WebSocket-Accept");
success = true;
if (expectedHash.equals(respHash) == false)
{
success = false;
throw new UpgradeException("Invalid Sec-WebSocket-Accept hash");
}
}
}

View File

@ -0,0 +1,152 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.client.internal;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
import java.net.URI;
import java.nio.channels.SocketChannel;
import java.util.Collection;
import java.util.Collections;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.internal.io.WebSocketClientSelectorManager;
/**
* Internal Connection/Client Manager used to track active clients, their physical vs virtual connection information, and provide some means to create new
* physical or virtual connections.
*/
public class ConnectionManager extends AggregateLifeCycle
{
private static final Logger LOG = Log.getLogger(ConnectionManager.class);
public static InetSocketAddress toSocketAddress(URI uri)
{
if (!uri.isAbsolute())
{
throw new IllegalArgumentException("Cannot get InetSocketAddress of non-absolute URIs");
}
int port = uri.getPort();
String scheme = uri.getScheme().toLowerCase();
if ("ws".equals(scheme))
{
if (port == (-1))
{
port = 80;
}
}
else if ("wss".equals(scheme))
{
if (port == (-1))
{
port = 443;
}
}
else
{
throw new IllegalArgumentException("Only support ws:// and wss:// URIs");
}
return new InetSocketAddress(uri.getHost(),port);
}
private final Queue<WebSocketClient> clients = new ConcurrentLinkedQueue<>();
private final WebSocketClientSelectorManager selector;
public ConnectionManager(ByteBufferPool bufferPool, Executor executor, ScheduledExecutorService scheduler, SslContextFactory sslContextFactory,
WebSocketPolicy policy)
{
selector = new WebSocketClientSelectorManager(bufferPool,executor,scheduler,policy);
selector.setSslContextFactory(sslContextFactory);
addBean(selector);
}
public void addClient(WebSocketClient client)
{
clients.add(client);
}
private void closeConnections()
{
for (WebSocketClient client : clients)
{
if (client.getConnection() != null)
{
client.getConnection().close();
}
}
}
public FutureCallback<UpgradeResponse> connectPhysical(IWebSocketClient client) throws IOException
{
SocketChannel channel = SocketChannel.open();
SocketAddress bindAddress = client.getFactory().getBindAddress();
if (bindAddress != null)
{
channel.bind(bindAddress);
}
URI wsUri = client.getWebSocketUri();
channel.socket().setTcpNoDelay(true); // disable nagle
channel.configureBlocking(false); // async allways
InetSocketAddress address = toSocketAddress(wsUri);
LOG.debug("Connect to {}",address);
channel.connect(address);
getSelector().connect(channel,client);
return client;
}
public FutureCallback<UpgradeResponse> connectVirtual(WebSocketClient client)
{
// TODO Auto-generated method stub
return null;
}
@Override
protected void doStop() throws Exception
{
closeConnections();
clients.clear();
super.doStop();
}
public Collection<WebSocketClient> getClients()
{
return Collections.unmodifiableCollection(clients);
}
public WebSocketClientSelectorManager getSelector()
{
return selector;
}
}

View File

@ -0,0 +1,215 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.client.internal;
import java.io.IOException;
import java.net.URI;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
import org.eclipse.jetty.websocket.driver.WebSocketEventDriver;
/**
* WebSocketClient for working with Upgrade (request and response), and establishing connections to the websocket URI of your choice.
*/
public class IWebSocketClient extends FutureCallback<UpgradeResponse> implements WebSocketClient
{
private static final Logger LOG = Log.getLogger(IWebSocketClient.class);
private final WebSocketClientFactory factory;
private final WebSocketPolicy policy;
private final WebSocketEventDriver websocket;
private URI websocketUri;
/**
* The abstract WebSocketConnection in use and established for this client.
* <p>
* Note: this is intentionally kept neutral, as WebSocketClient must be able to handle connections that are either physical (a socket connection) or virtual
* (eg: a mux connection).
*/
private WebSocketConnection connection;
private ClientUpgradeRequest upgradeRequest;
private ClientUpgradeResponse upgradeResponse;
public IWebSocketClient(WebSocketClientFactory factory, WebSocketEventDriver websocket)
{
this.factory = factory;
this.policy = factory.getPolicy();
this.websocket = websocket;
this.upgradeRequest = new ClientUpgradeRequest();
}
@Override
public void completed(UpgradeResponse context)
{
LOG.debug("completed() - {}",context);
// TODO Auto-generated method stub
super.completed(context);
}
/*
* (non-Javadoc)
*
* @see org.eclipse.jetty.websocket.client.internal.WebSocketClient#connect(java.net.URI)
*/
@Override
public FutureCallback<UpgradeResponse> connect(URI websocketUri) throws IOException
{
if (!factory.isStarted())
{
throw new IllegalStateException(WebSocketClientFactory.class.getSimpleName() + " is not started");
}
// Validate websocket URI
if (!websocketUri.isAbsolute())
{
throw new IllegalArgumentException("WebSocket URI must be absolute");
}
if (StringUtil.isBlank(websocketUri.getScheme()))
{
throw new IllegalArgumentException("WebSocket URI must include a scheme");
}
String scheme = websocketUri.getScheme().toLowerCase();
if (("ws".equals(scheme) == false) && ("wss".equals(scheme) == false))
{
throw new IllegalArgumentException("WebSocket URI scheme only supports [ws] and [wss], not [" + scheme + "]");
}
this.websocketUri = websocketUri;
// Validate websocket URI
FutureCallback<UpgradeResponse> result = null;
LOG.debug("connect({})",websocketUri);
ConnectionManager manager = factory.getConnectionManager();
// Check with factory first for possible alternate connect mechanism (such as mux)
result = manager.connectVirtual(this);
if (result == null)
{
// No such connect option, attempt to use a physical connection
result = manager.connectPhysical(this);
}
return result;
}
@Override
public void failed(UpgradeResponse context, Throwable cause)
{
LOG.debug("failed() - {}, {}",context,cause);
LOG.info(cause);
// TODO Auto-generated method stub
super.failed(context,cause);
}
protected ClientUpgradeRequest getClientUpgradeRequest()
{
return upgradeRequest;
}
/*
* (non-Javadoc)
*
* @see org.eclipse.jetty.websocket.client.internal.WebSocketClient#getConnection()
*/
@Override
public WebSocketConnection getConnection()
{
return this.connection;
}
/*
* (non-Javadoc)
*
* @see org.eclipse.jetty.websocket.client.internal.WebSocketClient#getFactory()
*/
@Override
public WebSocketClientFactory getFactory()
{
return factory;
}
/*
* (non-Javadoc)
*
* @see org.eclipse.jetty.websocket.client.internal.WebSocketClient#getPolicy()
*/
@Override
public WebSocketPolicy getPolicy()
{
return this.policy;
}
/*
* (non-Javadoc)
*
* @see org.eclipse.jetty.websocket.client.internal.WebSocketClient#getUpgradeRequest()
*/
@Override
public UpgradeRequest getUpgradeRequest()
{
return upgradeRequest;
}
/*
* (non-Javadoc)
*
* @see org.eclipse.jetty.websocket.client.internal.WebSocketClient#getUpgradeResponse()
*/
@Override
public UpgradeResponse getUpgradeResponse()
{
return upgradeResponse;
}
/*
* (non-Javadoc)
*
* @see org.eclipse.jetty.websocket.client.internal.WebSocketClient#getWebSocket()
*/
@Override
public WebSocketEventDriver getWebSocket()
{
return websocket;
}
/*
* (non-Javadoc)
*
* @see org.eclipse.jetty.websocket.client.internal.WebSocketClient#getWebSocketUri()
*/
@Override
public URI getWebSocketUri()
{
return websocketUri;
}
public void setUpgradeResponse(UpgradeResponse response)
{
// TODO Auto-generated method stub
}
}

View File

@ -0,0 +1,122 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.client.internal.io;
import java.nio.ByteBuffer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8LineParser;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.internal.ClientUpgradeResponse;
/**
* Responsible for reading UTF8 Response Header lines and parsing them into a provided UpgradeResponse object.
*/
public class HttpResponseHeaderParser
{
private enum State
{
STATUS_LINE,
HEADER,
END
}
private static final Pattern PAT_HEADER = Pattern.compile("([^:]+):\\s*(.*)");
private static final Pattern PAT_STATUS_LINE = Pattern.compile("^HTTP/1.[01]\\s+(\\d+)\\s+(.*)",Pattern.CASE_INSENSITIVE);
private ClientUpgradeResponse response;
private Utf8LineParser lineParser;
private State state;
public HttpResponseHeaderParser()
{
this.lineParser = new Utf8LineParser();
this.state = State.STATUS_LINE;
}
public boolean isDone()
{
return (state == State.END);
}
public UpgradeResponse parse(ByteBuffer buf) throws UpgradeException
{
while (!isDone() && (buf.remaining() > 0))
{
String line = lineParser.parse(buf);
if (line != null)
{
if (parseHeader(line))
{
return this.response;
}
}
}
return null;
}
private boolean parseHeader(String line)
{
switch (state)
{
case STATUS_LINE:
{
this.response = new ClientUpgradeResponse();
Matcher mat = PAT_STATUS_LINE.matcher(line);
if (!mat.matches())
{
throw new UpgradeException("Unexpected HTTP upgrade response status line [" + line + "]");
}
try
{
response.setStatusCode(Integer.parseInt(mat.group(1)));
}
catch (NumberFormatException e)
{
throw new UpgradeException("Unexpected HTTP upgrade response status code",e);
}
response.setStatusReason(mat.group(2));
state = State.HEADER;
break;
}
case HEADER:
{
if (StringUtil.isBlank(line))
{
state = State.END;
return parseHeader(line);
}
Matcher header = PAT_HEADER.matcher(line);
if (header.matches())
{
String headerName = header.group(1);
String headerValue = header.group(2);
response.addHeader(headerName,headerValue);
}
break;
}
case END:
state = State.STATUS_LINE;
return true;
}
return false;
}
}

View File

@ -0,0 +1,234 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.client.internal.io;
import java.io.IOException;
import java.net.URI;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.client.internal.ClientUpgradeRequest;
import org.eclipse.jetty.websocket.client.internal.IWebSocketClient;
import org.eclipse.jetty.websocket.protocol.AcceptHash;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
/**
* This is the initial connection handling that exists immediately after physical connection is established to destination server.
* <p>
* Eventually, upon successful Upgrade request/response, this connection swaps itself out for the WebSocektClientConnection handler.
*/
public class UpgradeConnection extends AbstractConnection
{
public class SendUpgradeRequest extends FutureCallback<String> implements Runnable
{
@Override
public void completed(String context)
{
// Writing the request header is complete.
super.completed(context);
// start the interest in fill
fillInterested();
}
@Override
public void run()
{
URI uri = client.getWebSocketUri();
String rawRequest = request.generate(uri);
ByteBuffer buf = BufferUtil.toBuffer(rawRequest,StringUtil.__UTF8_CHARSET);
getEndPoint().write("REQ",this,buf);
}
}
private static final Logger LOG = Log.getLogger(UpgradeConnection.class);
private final ByteBufferPool bufferPool;
private final ScheduledExecutorService scheduler;
private final IWebSocketClient client;
private final HttpResponseHeaderParser parser;
private ClientUpgradeRequest request;
public UpgradeConnection(EndPoint endp, Executor executor, IWebSocketClient client)
{
super(endp,executor);
this.client = client;
this.bufferPool = client.getFactory().getBufferPool();
this.scheduler = client.getFactory().getScheduler();
this.parser = new HttpResponseHeaderParser();
try
{
this.request = (ClientUpgradeRequest)client.getUpgradeRequest();
}
catch (ClassCastException e)
{
client.failed(null,new RuntimeException("Invalid Upgrade Request structure",e));
}
}
public void disconnect(boolean onlyOutput)
{
EndPoint endPoint = getEndPoint();
// We need to gently close first, to allow
// SSL close alerts to be sent by Jetty
LOG.debug("Shutting down output {}",endPoint);
endPoint.shutdownOutput();
if (!onlyOutput)
{
LOG.debug("Closing {}",endPoint);
endPoint.close();
}
}
private void notifyConnect()
{
client.completed(client.getUpgradeResponse());
}
@Override
public void onFillable()
{
int bufSize = client.getPolicy().getBufferSize();
ByteBuffer buffer = bufferPool.acquire(bufSize,false);
BufferUtil.clear(buffer);
boolean readMore = false;
try
{
readMore = read(buffer);
}
finally
{
bufferPool.release(buffer);
}
if (readMore)
{
fillInterested();
}
}
@Override
public void onOpen()
{
super.onOpen();
// TODO: handle timeout
getExecutor().execute(new SendUpgradeRequest());
}
/**
* Read / Parse the waiting read/fill buffer
*
* @param buffer
* the buffer to fill into from the endpoint
* @return true if there is more to read, false if reading should stop
*/
private boolean read(ByteBuffer buffer)
{
EndPoint endPoint = getEndPoint();
try
{
while (true)
{
int filled = endPoint.fill(buffer);
if (filled == 0)
{
return true;
}
else if (filled < 0)
{
LOG.debug("read - EOF Reached");
return false;
}
else
{
if (LOG.isDebugEnabled())
{
LOG.debug("Filled {} bytes - {}",filled,BufferUtil.toDetailString(buffer));
}
UpgradeResponse resp = parser.parse(buffer);
if (resp != null)
{
// Got a response!
client.setUpgradeResponse(resp);
validateResponse(resp);
notifyConnect();
upgradeConnection();
return false; // do no more reading
}
}
}
}
catch (IOException e)
{
LOG.warn(e);
client.failed(null,e);
disconnect(false);
return false;
}
catch (UpgradeException e)
{
LOG.warn(e);
client.failed(null,e);
disconnect(false);
return false;
}
}
private void upgradeConnection()
{
EndPoint endp = getEndPoint();
Executor executor = getExecutor();
WebSocketClientConnection conn = new WebSocketClientConnection(endp,executor,client);
endp.setConnection(conn);
}
private void validateResponse(UpgradeResponse response)
{
// Check the Accept hash
String reqKey = request.getKey();
String expectedHash = AcceptHash.hashKey(reqKey);
response.validateWebSocketHash(expectedHash);
// Parse extensions
List<ExtensionConfig> extensions = new ArrayList<>();
Iterator<String> extIter = response.getHeaderValues("Sec-WebSocket-Extensions");
while (extIter.hasNext())
{
String extVal = extIter.next();
QuotedStringTokenizer tok = new QuotedStringTokenizer(extVal,",");
while (tok.hasMoreTokens())
{
extensions.add(ExtensionConfig.parse(tok.nextToken()));
}
}
response.setExtensions(extensions);
}
}

View File

@ -0,0 +1,44 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.client.internal.io;
import java.util.concurrent.Executor;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.websocket.client.internal.IWebSocketClient;
import org.eclipse.jetty.websocket.io.AbstractWebSocketConnection;
public class WebSocketClientConnection extends AbstractWebSocketConnection
{
private final IWebSocketClient client;
public WebSocketClientConnection(EndPoint endp, Executor executor, IWebSocketClient client)
{
super(endp,executor,client.getFactory().getScheduler(),client.getPolicy(),client.getFactory().getBufferPool());
this.client = client;
}
public IWebSocketClient getClient()
{
return client;
}
@Override
public void onOpen()
{
super.onOpen();
}
}

View File

@ -13,7 +13,7 @@
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.client.io;
package org.eclipse.jetty.websocket.client.internal.io;
import java.io.IOException;
import java.nio.channels.SelectionKey;
@ -33,11 +33,8 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.WebSocketClient.ConnectFuture;
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
import org.eclipse.jetty.websocket.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.io.AbstractWebSocketConnection;
import org.eclipse.jetty.websocket.client.internal.IWebSocketClient;
public class WebSocketClientSelectorManager extends SelectorManager
{
@ -72,11 +69,11 @@ public class WebSocketClientSelectorManager extends SelectorManager
public Connection newConnection(final SocketChannel channel, EndPoint endPoint, final Object attachment) throws IOException
{
LOG.debug("newConnection({},{},{})",channel,endPoint,attachment);
WebSocketClient.ConnectFuture confut = (WebSocketClient.ConnectFuture)attachment;
IWebSocketClient client = (IWebSocketClient)attachment;
try
{
String scheme = confut.getWebSocketUri().getScheme();
String scheme = client.getWebSocketUri().getScheme();
if ("wss".equalsIgnoreCase(scheme))
{
@ -87,7 +84,7 @@ public class WebSocketClientSelectorManager extends SelectorManager
SslConnection sslConnection = new SslConnection(bufferPool,executor,endPoint,engine);
EndPoint sslEndPoint = sslConnection.getDecryptedEndPoint();
Connection connection = newWebSocketConnection(channel,sslEndPoint,confut);
Connection connection = newUpgradeConnection(channel,sslEndPoint,client);
sslEndPoint.setConnection(connection);
connectionOpened(connection);
return sslConnection;
@ -101,13 +98,13 @@ public class WebSocketClientSelectorManager extends SelectorManager
else
{
// Standard "ws://"
return newWebSocketConnection(channel,endPoint,confut);
return newUpgradeConnection(channel,endPoint,client);
}
}
catch (IOException e)
{
LOG.debug(e);
confut.failed(null,e);
client.failed(null,e);
// rethrow
throw e;
}
@ -129,20 +126,14 @@ public class WebSocketClientSelectorManager extends SelectorManager
return engine;
}
public AbstractWebSocketConnection newWebSocketConnection(SocketChannel channel, EndPoint endPoint, ConnectFuture confut)
public UpgradeConnection newUpgradeConnection(SocketChannel channel, EndPoint endPoint, IWebSocketClient client)
{
WebSocketClientFactory factory = confut.getFactory();
WebSocketEventDriver websocket = confut.getWebSocket();
WebSocketClientFactory factory = client.getFactory();
Executor executor = factory.getExecutor();
WebSocketPolicy policy = factory.getPolicy();
ByteBufferPool bufferPool = factory.getBufferPool();
ScheduledExecutorService scheduler = factory.getScheduler();
UpgradeConnection connection = new UpgradeConnection(endPoint,executor,client);
AbstractWebSocketConnection connection = new WebSocketClientConnection(endPoint,executor,scheduler,policy,bufferPool,factory,confut);
connection.getParser().setIncomingFramesHandler(websocket);
// TODO: track open websockets? bind open websocket to connection?
// track the client
factory.getConnectionManager().addClient(client);
return connection;
}

View File

@ -1,34 +0,0 @@
package org.eclipse.jetty.websocket.client.io;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledExecutorService;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.client.WebSocketClient.ConnectFuture;
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
import org.eclipse.jetty.websocket.io.AbstractWebSocketConnection;
public class WebSocketClientConnection extends AbstractWebSocketConnection
{
private final WebSocketClientFactory factory;
private final ConnectFuture connectFuture;
public WebSocketClientConnection(EndPoint endp, Executor executor, ScheduledExecutorService scheduler, WebSocketPolicy policy, ByteBufferPool bufferPool,
WebSocketClientFactory factory, ConnectFuture confut)
{
super(endp,executor,scheduler,policy,bufferPool);
this.factory = factory;
this.connectFuture = confut;
}
@Override
public void onOpen()
{
super.onOpen();
// TODO: Handshake handshake = new WebSocket13Handshake(this);
// TODO: getExecutor().execute(handshake);
}
}

View File

@ -44,6 +44,12 @@ public class TrackingSocket extends WebSocketAdapter
assertNotClosed();
}
public void assertMessage(String string)
{
// TODO Auto-generated method stub
}
public void assertNotClosed()
{
Assert.assertThat("Close Code",close.get(),is(-1));
@ -68,6 +74,7 @@ public class TrackingSocket extends WebSocketAdapter
@Override
public void onWebSocketClose(int statusCode, String reason)
{
super.onWebSocketClose(statusCode,reason);
close.set(statusCode);
closeMessage.append(reason);
closeLatch.countDown();
@ -76,6 +83,7 @@ public class TrackingSocket extends WebSocketAdapter
@Override
public void onWebSocketConnect(WebSocketConnection connection)
{
super.onWebSocketConnect(connection);
open.set(true);
openLatch.countDown();
}
@ -86,4 +94,10 @@ public class TrackingSocket extends WebSocketAdapter
dataLatch.countDown();
messageQueue.add(message);
}
public void waitForResponseMessage()
{
// TODO Auto-generated method stub
}
}

View File

@ -0,0 +1,80 @@
package org.eclipse.jetty.websocket.client;
import java.net.URI;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameters;
@RunWith(Parameterized.class)
public class WebSocketClientBadUriTest
{
@Parameters
public static Collection<String[]> data()
{
List<String[]> data = new ArrayList<>();
// @formatter:off
// - not using right scheme
data.add(new String[] { "http://localhost" });
data.add(new String[] { "https://localhost" });
data.add(new String[] { "file://localhost" });
data.add(new String[] { "content://localhost" });
data.add(new String[] { "jar://localhost" });
// - non-absolute uri
data.add(new String[] { "/mysocket" });
data.add(new String[] { "/sockets/echo" });
data.add(new String[] { "#echo" });
data.add(new String[] { "localhost:8080/echo" });
// @formatter:on
return data;
}
private WebSocketClientFactory factory;
private final String uriStr;
private final URI uri;
public WebSocketClientBadUriTest(String rawUri)
{
this.uriStr = rawUri;
this.uri = URI.create(uriStr);
}
@Before
public void startFactory() throws Exception
{
factory = new WebSocketClientFactory();
factory.start();
}
@After
public void stopFactory() throws Exception
{
factory.stop();
}
@Test
public void testBadURI() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
try
{
client.connect(uri); // should toss exception
Assert.fail("Expected IllegalArgumentException");
}
catch (IllegalArgumentException e)
{
// expected path
wsocket.assertNotOpened();
}
}
}

View File

@ -0,0 +1,38 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import org.junit.Assert;
import org.junit.Test;
public class WebSocketClientFactoryTest
{
@Test
public void testNewSocket()
{
WebSocketClientFactory factory = new WebSocketClientFactory();
WebSocketClient client = factory.newWebSocketClient(new TrackingSocket());
Assert.assertThat("Client",client,notNullValue());
Assert.assertThat("Client.factory",client.getFactory(),is(factory));
Assert.assertThat("Client.policy",client.getPolicy(),is(factory.getPolicy()));
Assert.assertThat("Client.upgradeRequest",client.getUpgradeRequest(),notNullValue());
Assert.assertThat("Client.upgradeResponse",client.getUpgradeResponse(),nullValue());
}
}

View File

@ -17,9 +17,7 @@ package org.eclipse.jetty.websocket.client;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
@ -33,16 +31,22 @@ import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.FutureCallback;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
@Ignore("work in progress")
public class WebSocketClientTest
{
private BlockheadServer server;
@ -74,97 +78,114 @@ public class WebSocketClientTest
server.stop();
}
@Test
@Test(expected = UpgradeException.class)
public void testBadHandshake() throws Exception
{
WebSocketClient client = factory.newWebSocketClient();
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
FutureCallback<UpgradeResponse> future = client.connect(wsUri);
ServerConnection connection = server.accept();
String req = connection.readRequest();
connection.readRequest();
// no upgrade, just fail with a 404 error
connection.respond("HTTP/1.1 404 NOT FOUND\r\n\r\n");
Throwable error = null;
// The attempt to get upgrade response future should throw error
try
{
future.get(250,TimeUnit.MILLISECONDS);
Assert.fail("Should have resulted in an ExecutionException -> IOException");
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> UpgradeException");
}
catch (ExecutionException e)
{
error = e.getCause();
FutureCallback.rethrow(e);
}
wsocket.assertNotOpened();
wsocket.assertCloseCode(StatusCode.PROTOCOL);
Assert.assertTrue(error instanceof IOException);
Assert.assertTrue(error.getMessage().indexOf("404 NOT FOUND") > 0);
}
@Test
@Test(expected = UpgradeException.class)
public void testBadUpgrade() throws Exception
{
WebSocketClient client = factory.newWebSocketClient();
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
FutureCallback<UpgradeResponse> future = client.connect(wsUri);
ServerConnection connection = server.accept();
connection.readRequest();
// Upgrade badly
connection.respond("HTTP/1.1 101 Upgrade\r\n" + "Sec-WebSocket-Accept: rubbish\r\n" + "\r\n");
Throwable error = null;
// The attempt to get upgrade response future should throw error
try
{
future.get(250,TimeUnit.MILLISECONDS);
Assert.fail();
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> UpgradeException");
}
catch (ExecutionException e)
{
error = e.getCause();
FutureCallback.rethrow(e);
}
wsocket.assertNotOpened();
wsocket.assertCloseCode(StatusCode.PROTOCOL);
Assert.assertTrue(error instanceof IOException);
Assert.assertThat("Error Message",error.getMessage(),containsString("Bad Sec-WebSocket-Accept"));
}
@Test
public void testBadURL() throws Exception
public void testBasicEcho_FromClient() throws Exception
{
WebSocketClient client = factory.newWebSocketClient();
TrackingSocket cliSock = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(cliSock);
client.getPolicy().setIdleTimeout(10000);
URI wsUri = server.getWsUri();
UpgradeRequest request = client.getUpgradeRequest();
request.setSubProtocols("echo");
Future<UpgradeResponse> future = client.connect(wsUri);
final ServerConnection srvSock = server.accept();
srvSock.upgrade();
UpgradeResponse resp = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertThat("Response",resp,notNullValue());
Assert.assertEquals("Response.success",resp.isSuccess(),is(true));
cliSock.assertWasOpened();
cliSock.assertNotClosed();
Assert.assertThat("Factory.sockets.size",factory.getConnectionManager().getClients().size(),is(1));
cliSock.getConnection().write(null,new FutureCallback<Void>(),"Hello World!");
srvSock.echoMessage();
// wait for response from server
cliSock.waitForResponseMessage();
cliSock.assertMessage("Hello World!");
}
@Test
public void testBasicEcho_FromServer() throws Exception
{
TrackingSocket wsocket = new TrackingSocket();
try
{
// Intentionally bad scheme in URI
URI wsUri = new URI("http://localhost:8080");
WebSocketClient client = factory.newWebSocketClient(wsocket);
Future<UpgradeResponse> future = client.connect(server.getWsUri());
client.connect(wsUri,wsocket); // should toss exception
// Server
final ServerConnection srvSock = server.accept();
srvSock.upgrade();
Assert.fail("Expected IllegalArgumentException");
}
catch (IllegalArgumentException e)
{
// expected path
wsocket.assertNotOpened();
}
// Have server send initial message
srvSock.write(WebSocketFrame.text("Hello World"));
// Verify connect
future.get(500,TimeUnit.MILLISECONDS);
wsocket.assertMessage("Hello world");
}
@Test
public void testBlockReceiving() throws Exception
{
WebSocketClient client = factory.newWebSocketClient();
client.getPolicy().setIdleTimeout(60000);
final AtomicBoolean open = new AtomicBoolean(false);
final AtomicInteger close = new AtomicInteger();
final CountDownLatch _latch = new CountDownLatch(1);
@ -201,16 +222,16 @@ public class WebSocketClientTest
}
};
WebSocketClient client = factory.newWebSocketClient(socket);
client.getPolicy().setIdleTimeout(60000);
URI wsUri = server.getWsUri();
Future<WebSocketConnection> future = client.connect(wsUri,socket);
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection sconnection = server.accept();
sconnection.setSoTimeout(60000);
WebSocketConnection connection = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertNotNull(connection);
Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get());
UpgradeResponse resp = future.get(250,TimeUnit.MILLISECONDS);
// define some messages to send server to client
byte[] send = new byte[]
@ -290,21 +311,17 @@ public class WebSocketClientTest
@Test
public void testBlockSending() throws Exception
{
WebSocketClient client = factory.newWebSocketClient();
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
client.getPolicy().setIdleTimeout(10000);
TrackingSocket wsocket = new TrackingSocket();
URI wsUri = server.getWsUri();
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
Future<UpgradeResponse> future = client.connect(wsUri);
final ServerConnection ssocket = server.accept();
ssocket.upgrade();
WebSocketConnection connection = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertNotNull(connection);
wsocket.assertWasOpened();
wsocket.assertNotClosed();
UpgradeResponse resp = future.get(250,TimeUnit.MILLISECONDS);
final int messages = 200000;
final AtomicLong totalB = new AtomicLong();
@ -345,7 +362,7 @@ public class WebSocketClientTest
String mesg = "This is a test message to send";
for (int i = 0; i < messages; i++)
{
connection.write(null,new FutureCallback<Void>(),mesg);
wsocket.getConnection().write(null,new FutureCallback<Void>(),mesg);
}
// Duration for the write phase
@ -366,102 +383,90 @@ public class WebSocketClientTest
@Test
public void testConnectionNotAccepted() throws Exception
{
WebSocketClient client = factory.newWebSocketClient();
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
Future<UpgradeResponse> future = client.connect(wsUri);
// Intentionally not accept incoming socket.
// server.accept();
try
{
future.get(250,TimeUnit.MILLISECONDS);
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Should have Timed Out");
}
catch (TimeoutException e)
{
// Expected Path
wsocket.assertNotOpened();
wsocket.assertCloseCode(StatusCode.NO_CLOSE);
}
}
@Test
@Test(expected = ConnectException.class)
public void testConnectionRefused() throws Exception
{
WebSocketClient client = factory.newWebSocketClient();
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
// Intentionally bad port
URI wsUri = new URI("ws://127.0.0.1:1");
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
Future<UpgradeResponse> future = client.connect(wsUri);
Throwable error = null;
// The attempt to get upgrade response future should throw error
try
{
future.get(1,TimeUnit.SECONDS);
Assert.fail("Expected ExecutionException");
future.get(1000,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> ConnectException");
}
catch (ExecutionException e)
{
error = e.getCause();
FutureCallback.rethrow(e);
}
wsocket.assertNotOpened();
wsocket.assertCloseCode(StatusCode.NO_CLOSE);
Assert.assertTrue(error instanceof ConnectException);
}
@Test
@Test(expected = TimeoutException.class)
public void testConnectionTimeout() throws Exception
{
WebSocketClient client = factory.newWebSocketClient();
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
Assert.assertNotNull(ssocket);
// Intentionally don't upgrade
// ssocket.upgrade();
// The attempt to get upgrade response future should throw error
try
{
future.get(250,TimeUnit.MILLISECONDS);
Assert.fail("Expected Timeout Exception");
future.get(500,TimeUnit.MILLISECONDS);
Assert.fail("Expected ExecutionException -> TimeoutException");
}
catch (TimeoutException e)
catch (ExecutionException e)
{
// Expected path
wsocket.assertNotOpened();
wsocket.assertCloseCode(StatusCode.NO_CLOSE);
FutureCallback.rethrow(e);
}
}
@Test
public void testIdle() throws Exception
{
WebSocketClient client = factory.newWebSocketClient();
client.getPolicy().setIdleTimeout(500);
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
client.getPolicy().setIdleTimeout(500);
URI wsUri = server.getWsUri();
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
WebSocketConnection connection = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertNotNull(connection);
wsocket.assertWasOpened();
wsocket.assertNotClosed();
future.get(250,TimeUnit.MILLISECONDS);
long start = System.currentTimeMillis();
wsocket.closeLatch.await(10,TimeUnit.SECONDS);
@ -472,50 +477,60 @@ public class WebSocketClientTest
@Test
public void testMessageBiggerThanBufferSize() throws Exception
{
int bufferSize = 512;
factory.getPolicy().setBufferSize(512);
WebSocketClient client = factory.newWebSocketClient();
TrackingSocket wsocket = new TrackingSocket();
URI wsUri = server.getWsUri();
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
Assert.assertTrue(wsocket.openLatch.await(1,TimeUnit.SECONDS));
int length = bufferSize + (bufferSize / 2); // 1.5 times buffer size
ssocket.write(0x80 | 0x01); // FIN + TEXT
ssocket.write(0x7E); // No MASK and 2 bytes length
ssocket.write(length >> 8); // first length byte
ssocket.write(length & 0xFF); // second length byte
for (int i = 0; i < length; ++i)
WebSocketClientFactory factSmall = new WebSocketClientFactory();
factSmall.start();
try
{
ssocket.write('x');
}
ssocket.flush();
int bufferSize = 512;
factSmall.getPolicy().setBufferSize(512);
Assert.assertTrue(wsocket.dataLatch.await(1000,TimeUnit.SECONDS));
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factSmall.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
future.get(500,TimeUnit.MILLISECONDS);
Assert.assertTrue(wsocket.openLatch.await(1,TimeUnit.SECONDS));
int length = bufferSize + (bufferSize / 2); // 1.5 times buffer size
ssocket.write(0x80 | 0x01); // FIN + TEXT
ssocket.write(0x7E); // No MASK and 2 bytes length
ssocket.write(length >> 8); // first length byte
ssocket.write(length & 0xFF); // second length byte
for (int i = 0; i < length; ++i)
{
ssocket.write('x');
}
ssocket.flush();
Assert.assertTrue(wsocket.dataLatch.await(1000,TimeUnit.SECONDS));
}
finally
{
factSmall.stop();
}
}
@Test
public void testNotIdle() throws Exception
{
WebSocketClient client = factory.newWebSocketClient();
client.getPolicy().setIdleTimeout(500);
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
client.getPolicy().setIdleTimeout(500);
URI wsUri = server.getWsUri();
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
WebSocketConnection connection = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertNotNull(connection);
future.get(250,TimeUnit.MILLISECONDS);
wsocket.assertIsOpen();
@ -525,7 +540,7 @@ public class WebSocketClientTest
for (int i = 0; i < 10; i++)
{
Thread.sleep(250);
connection.write(null,new FutureCallback<Void>(),"Hello");
wsocket.getConnection().write(null,new FutureCallback<Void>(),"Hello");
len = ssocket.getInputStream().read(recv,0,recv.length);
Assert.assertTrue(len > 0);
}
@ -556,18 +571,16 @@ public class WebSocketClientTest
@Test
public void testUpgradeThenTCPClose() throws Exception
{
WebSocketClient client = factory.newWebSocketClient();
TrackingSocket wsocket = new TrackingSocket();
WebSocketClient client = factory.newWebSocketClient(wsocket);
URI wsUri = server.getWsUri();
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
Future<UpgradeResponse> future = client.connect(wsUri);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
WebSocketConnection connection = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertNotNull(connection);
future.get(500,TimeUnit.MILLISECONDS);
wsocket.assertIsOpen();
@ -576,23 +589,4 @@ public class WebSocketClientTest
wsocket.assertCloseCode(StatusCode.NO_CLOSE);
}
@Test
public void testURIWithDefaultPort() throws Exception
{
URI uri = new URI("ws://localhost");
InetSocketAddress addr = WebSocketClient.toSocketAddress(uri);
Assert.assertThat("URI (" + uri + ").host",addr.getHostName(),is("localhost"));
Assert.assertThat("URI (" + uri + ").port",addr.getPort(),is(80));
}
@Test
public void testURIWithDefaultWSSPort() throws Exception
{
URI uri = new URI("wss://localhost");
InetSocketAddress addr = WebSocketClient.toSocketAddress(uri);
Assert.assertThat("URI (" + uri + ").host",addr.getHostName(),is("localhost"));
Assert.assertThat("URI (" + uri + ").port",addr.getPort(),is(443));
}
}

View File

@ -33,6 +33,7 @@ import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.protocol.AcceptHash;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.Assert;
/**
@ -58,6 +59,12 @@ public class BlockheadServer
this.socket.close();
}
public void echoMessage()
{
// TODO Auto-generated method stub
}
public void flush() throws IOException
{
getOutputStream().flush();
@ -152,6 +159,12 @@ public class BlockheadServer
{
getOutputStream().write(b);
}
public void write(WebSocketFrame frame)
{
// TODO Auto-generated method stub
}
}
private static final Logger LOG = Log.getLogger(BlockheadServer.class);

View File

@ -320,12 +320,12 @@ public class TestClient
private void open() throws Exception
{
WebSocketClient client = factory.newWebSocketClient();
WebSocketClient client = factory.newWebSocketClient(socket);
client.getPolicy().setIdleTimeout(_timeout);
client.setProtocol(_protocol);
client.getUpgradeRequest().setSubProtocols(_protocol);
socket = new TestSocket();
URI wsUri = new URI("ws://" + _host + ":" + _port + "/");
client.connect(wsUri,socket).get(10,TimeUnit.SECONDS);
client.connect(wsUri).get(10,TimeUnit.SECONDS);
}
private void send(byte op, byte[] data, int fragment)

View File

@ -0,0 +1,73 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.client.internal;
import static org.hamcrest.Matchers.*;
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URISyntaxException;
import org.junit.Assert;
import org.junit.Test;
public class ConnectionManagerTest
{
private void assertToSocketAddress(String uriStr, String expectedHost, int expectedPort) throws URISyntaxException
{
URI uri = new URI(uriStr);
InetSocketAddress addr = ConnectionManager.toSocketAddress(uri);
Assert.assertThat("URI (" + uri + ").host",addr.getHostName(),is(expectedHost));
Assert.assertThat("URI (" + uri + ").port",addr.getPort(),is(expectedPort));
}
@Test
public void testToSocketAddress_AltWsPort() throws Exception
{
assertToSocketAddress("ws://localhost:8099","localhost",8099);
}
@Test
public void testToSocketAddress_AltWssPort() throws Exception
{
assertToSocketAddress("wss://localhost","localhost",443);
}
@Test
public void testToSocketAddress_DefaultWsPort() throws Exception
{
assertToSocketAddress("ws://localhost","localhost",80);
}
@Test
public void testToSocketAddress_DefaultWsPort_Path() throws Exception
{
assertToSocketAddress("ws://localhost/sockets/chat","localhost",80);
}
@Test
public void testToSocketAddress_DefaultWssPort() throws Exception
{
assertToSocketAddress("wss://localhost:9443","localhost",9443);
}
@Test
public void testToSocketAddress_DefaultWssPort_Path() throws Exception
{
assertToSocketAddress("wss://localhost/sockets/chat","localhost",443);
}
}

View File

@ -0,0 +1,136 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.client.internal.io;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.junit.Assert;
import org.junit.Test;
public class HttpResponseHeaderParserTest
{
private void appendUtf8(ByteBuffer buf, String line)
{
buf.put(ByteBuffer.wrap(StringUtil.getBytes(line,StringUtil.__UTF8)));
}
@Test
public void testParseRealWorldResponse()
{
// Arbitrary Http Response Headers seen in the wild.
// Request URI -> https://ssl.google-analytics.com/__utm.gif
List<String> expected = new ArrayList<>();
expected.add("HTTP/1.0 200 OK");
expected.add("Date: Thu, 09 Aug 2012 16:16:39 GMT");
expected.add("Content-Length: 35");
expected.add("X-Content-Type-Options: nosniff");
expected.add("Pragma: no-cache");
expected.add("Expires: Wed, 19 Apr 2000 11:43:00 GMT");
expected.add("Last-Modified: Wed, 21 Jan 2004 19:51:30 GMT");
expected.add("Content-Type: image/gif");
expected.add("Cache-Control: private, no-cache, no-cache=Set-Cookie, proxy-revalidate");
expected.add("Age: 518097");
expected.add("Server: GFE/2.0");
expected.add("Connection: Keep-Alive");
expected.add("");
// Prepare Buffer
ByteBuffer buf = ByteBuffer.allocate(512);
for (String line : expected)
{
appendUtf8(buf,line + "\r\n");
}
BufferUtil.flipToFlush(buf,0);
// Parse Buffer
HttpResponseHeaderParser parser = new HttpResponseHeaderParser();
UpgradeResponse response = parser.parse(buf);
Assert.assertThat("Response",response,notNullValue());
Assert.assertThat("Response.statusCode",response.getStatusCode(),is(200));
Assert.assertThat("Response.statusReason",response.getStatusReason(),is("OK"));
Assert.assertThat("Response.header[age]",response.getHeaderValue("age"),is("518097"));
}
@Test
public void testParseRealWorldResponse_SmallBuffers()
{
// Arbitrary Http Response Headers seen in the wild.
// Request URI -> https://ssl.google-analytics.com/__utm.gif
List<String> expected = new ArrayList<>();
expected.add("HTTP/1.0 200 OK");
expected.add("Date: Thu, 09 Aug 2012 16:16:39 GMT");
expected.add("Content-Length: 35");
expected.add("X-Content-Type-Options: nosniff");
expected.add("Pragma: no-cache");
expected.add("Expires: Wed, 19 Apr 2000 11:43:00 GMT");
expected.add("Last-Modified: Wed, 21 Jan 2004 19:51:30 GMT");
expected.add("Content-Type: image/gif");
expected.add("Cache-Control: private, no-cache, no-cache=Set-Cookie, proxy-revalidate");
expected.add("Age: 518097");
expected.add("Server: GFE/2.0");
expected.add("Connection: Keep-Alive");
expected.add("");
// Prepare Buffer
ByteBuffer buf = ByteBuffer.allocate(512);
for (String line : expected)
{
appendUtf8(buf,line + "\r\n");
}
BufferUtil.flipToFlush(buf,0);
// Prepare small buffers to simulate a slow read/fill/parse from the network
ByteBuffer small1 = buf.slice();
ByteBuffer small2 = buf.slice();
ByteBuffer small3 = buf.slice();
small1.limit(50);
small2.position(50);
small2.limit(70);
small3.position(70);
// Parse Buffer
HttpResponseHeaderParser parser = new HttpResponseHeaderParser();
UpgradeResponse response;
// Parse small 1
response = parser.parse(small1);
Assert.assertThat("Small 1",response,nullValue());
// Parse small 2
response = parser.parse(small2);
Assert.assertThat("Small 2",response,nullValue());
// Parse small 3
response = parser.parse(small3);
Assert.assertThat("Small 3",response,notNullValue());
Assert.assertThat("Response.statusCode",response.getStatusCode(),is(200));
Assert.assertThat("Response.statusReason",response.getStatusReason(),is("OK"));
Assert.assertThat("Response.header[age]",response.getHeaderValue("age"),is("518097"));
}
}

View File

@ -0,0 +1,43 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.api;
/**
* Exception during WebSocket Upgrade Handshake.
*/
@SuppressWarnings("serial")
public class UpgradeException extends WebSocketException
{
public UpgradeException()
{
super();
}
public UpgradeException(String message)
{
super(message);
}
public UpgradeException(String message, Throwable cause)
{
super(message,cause);
}
public UpgradeException(Throwable cause)
{
super(cause);
}
}

View File

@ -13,17 +13,16 @@
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.server;
package org.eclipse.jetty.websocket.api;
import java.util.List;
import java.util.Map;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
public interface WebSocketRequest
public interface UpgradeRequest
{
// TODO: getSession
// TODO: getCookies
// TODO: getRequestAttributes ?
public Map<String, String> getCookieMap();
public List<ExtensionConfig> getExtensions();
@ -40,4 +39,6 @@ public interface WebSocketRequest
public boolean hasSubProtocol(String test);
public boolean isOrigin(String test);
public void setSubProtocols(String string);
}

View File

@ -13,15 +13,19 @@
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.server;
package org.eclipse.jetty.websocket.api;
import java.io.IOException;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
public interface WebSocketResponse
public interface UpgradeResponse
{
public void addHeader(String name, String value);
/**
* Get the accepted WebSocket protocol.
*
@ -36,6 +40,18 @@ public interface WebSocketResponse
*/
public List<ExtensionConfig> getExtensions();
public Set<String> getHeaderNamesSet();
public String getHeaderValue(String name);
public Iterator<String> getHeaderValues(String name);
public int getStatusCode();
public String getStatusReason();
public boolean isSuccess();
/**
* Issue a forbidden upgrade response.
* <p>
@ -64,7 +80,7 @@ public interface WebSocketResponse
* <p>
* Notes:
* <ul>
* <li>Per the spec you cannot add extensions that have not been seen in the {@link WebSocketRequest}, just remove entries you don't want to use</li>
* <li>Per the spec you cannot add extensions that have not been seen in the {@link UpgradeRequest}, just remove entries you don't want to use</li>
* <li>If this is unused, or a null is passed, then the list negotiation will follow default behavior and use the complete list of extensions that are
* available in this WebSocket server implementation.</li>
* </ul>
@ -73,4 +89,8 @@ public interface WebSocketResponse
* the list of extensions to use.
*/
public void setExtensions(List<ExtensionConfig> extensions);
public void setHeader(String name, String value);
public void validateWebSocketHash(String expectedHash) throws UpgradeException;
}

View File

@ -15,8 +15,10 @@
//========================================================================
package org.eclipse.jetty.websocket.protocol;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.*;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.TypeUtil;
import org.junit.Assert;
import org.junit.Test;
@ -25,6 +27,33 @@ public class AcceptHashTest
@Test
public void testHash()
{
Assert.assertThat(AcceptHash.hashKey("dGhlIHNhbXBsZSBub25jZQ=="),is("s3pPLMBiTxaQ9kYGzzhZRbK+xOo="));
byte key[] = TypeUtil.fromHexString("00112233445566778899AABBCCDDEEFF");
Assert.assertThat("Key size",key.length,is(16));
// what the client sends
String clientKey = String.valueOf(B64Code.encode(key));
// what the server responds with
String serverHash = AcceptHash.hashKey(clientKey);
// how the client validates
Assert.assertThat(serverHash,is("mVL6JKtNRC4tluIaFAW2hhMffgE="));
}
/**
* Test of values present in RFC-6455.
* <p>
* Note: client key bytes are "7468652073616d706c65206e6f6e6365"
*/
@Test
public void testRfcHashExample()
{
// What the client sends in the RFC
String clientKey = "dGhlIHNhbXBsZSBub25jZQ==";
// What the server responds with
String serverAccept = AcceptHash.hashKey(clientKey);
String expectedHash = "s3pPLMBiTxaQ9kYGzzhZRbK+xOo=";
Assert.assertThat(serverAccept,is(expectedHash));
}
}

View File

@ -17,15 +17,19 @@ package org.eclipse.jetty.websocket.server;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import javax.servlet.http.Cookie;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletRequestWrapper;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
public class ServletWebSocketRequest extends HttpServletRequestWrapper implements WebSocketRequest
public class ServletWebSocketRequest extends HttpServletRequestWrapper implements UpgradeRequest
{
private List<String> subProtocols = new ArrayList<>();
private List<ExtensionConfig> extensions;
@ -57,6 +61,17 @@ public class ServletWebSocketRequest extends HttpServletRequestWrapper implement
}
}
@Override
public Map<String, String> getCookieMap()
{
Map<String, String> ret = new HashMap<String, String>();
for (Cookie cookie : super.getCookies())
{
ret.put(cookie.getName(),cookie.getValue());
}
return ret;
}
@Override
public List<ExtensionConfig> getExtensions()
{
@ -129,4 +144,14 @@ public class ServletWebSocketRequest extends HttpServletRequestWrapper implement
return protocols;
}
/**
* Not implemented (not relevant) on server side.
*
* @see org.eclipse.jetty.websocket.api.UpgradeRequest#setSubProtocols(java.lang.String)
*/
@Override
public void setSubProtocols(String protocol)
{
/* not relevant for server side/servlet work */
}
}

View File

@ -17,23 +17,36 @@ package org.eclipse.jetty.websocket.server;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Set;
import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpServletResponseWrapper;
import org.eclipse.jetty.websocket.api.UpgradeException;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.protocol.ExtensionConfig;
public class ServletWebSocketResponse extends HttpServletResponseWrapper implements WebSocketResponse
public class ServletWebSocketResponse extends HttpServletResponseWrapper implements UpgradeResponse
{
private String acceptedProtocol;
private List<ExtensionConfig> extensions = new ArrayList<>();
private boolean success = true;
public ServletWebSocketResponse(HttpServletResponse resp)
{
super(resp);
}
@Override
public void addHeader(String name, String value)
{
super.addHeader(name,value);
}
@Override
public String getAcceptedSubProtocol()
{
@ -46,9 +59,47 @@ public class ServletWebSocketResponse extends HttpServletResponseWrapper impleme
return this.extensions;
}
@Override
public Set<String> getHeaderNamesSet()
{
Collection<String> names = getHeaderNames();
return new HashSet<String>(names);
}
@Override
public String getHeaderValue(String name)
{
return super.getHeader(name);
}
@Override
public Iterator<String> getHeaderValues(String name)
{
return super.getHeaders(name).iterator();
}
@Override
public int getStatusCode()
{
throw new UnsupportedOperationException("Server cannot get Status Code");
}
@Override
public String getStatusReason()
{
throw new UnsupportedOperationException("Server cannot get Status Reason");
}
@Override
public boolean isSuccess()
{
return success;
}
@Override
public void sendForbidden(String message) throws IOException
{
success = false;
sendError(HttpServletResponse.SC_FORBIDDEN,message);
}
@ -63,4 +114,10 @@ public class ServletWebSocketResponse extends HttpServletResponseWrapper impleme
{
this.extensions = extensions;
}
@Override
public void validateWebSocketHash(String expectedHash) throws UpgradeException
{
throw new UnsupportedOperationException("Server cannot validate its own hash");
}
}

View File

@ -16,6 +16,8 @@
package org.eclipse.jetty.websocket.server;
import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
/**
* Abstract WebSocket creator interface.
@ -35,5 +37,5 @@ public interface WebSocketCreator
* the request details
* @return a websocket object to use, or null if no websocket should be created from this request.
*/
Object createWebSocket(WebSocketRequest req, WebSocketResponse resp);
Object createWebSocket(UpgradeRequest req, UpgradeResponse resp);
}

View File

@ -45,6 +45,8 @@ import org.eclipse.jetty.websocket.api.Extension;
import org.eclipse.jetty.websocket.api.ExtensionRegistry;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.driver.EventMethodsCache;
import org.eclipse.jetty.websocket.driver.WebSocketEventDriver;
import org.eclipse.jetty.websocket.extensions.WebSocketExtensionRegistry;
@ -151,7 +153,7 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
}
@Override
public Object createWebSocket(WebSocketRequest req, WebSocketResponse resp)
public Object createWebSocket(UpgradeRequest req, UpgradeResponse resp)
{
if (methodsCache.count() < 1)
{

View File

@ -15,9 +15,9 @@
//========================================================================
package org.eclipse.jetty.websocket.server.examples.echo;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.server.WebSocketCreator;
import org.eclipse.jetty.websocket.server.WebSocketRequest;
import org.eclipse.jetty.websocket.server.WebSocketResponse;
/**
* Example of setting up a creator to create appropriately via the proposed and negotiated protocols.
@ -29,7 +29,7 @@ public class EchoCreator implements WebSocketCreator
private LogSocket logSocket = new LogSocket();
@Override
public Object createWebSocket(WebSocketRequest req, WebSocketResponse resp)
public Object createWebSocket(UpgradeRequest req, UpgradeResponse resp)
{
for (String protocol : req.getSubProtocols())
{

View File

@ -30,9 +30,9 @@ import org.eclipse.jetty.websocket.annotations.OnWebSocketConnect;
import org.eclipse.jetty.websocket.annotations.OnWebSocketMessage;
import org.eclipse.jetty.websocket.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.UpgradeRequest;
import org.eclipse.jetty.websocket.api.UpgradeResponse;
import org.eclipse.jetty.websocket.server.WebSocketCreator;
import org.eclipse.jetty.websocket.server.WebSocketRequest;
import org.eclipse.jetty.websocket.server.WebSocketResponse;
import org.eclipse.jetty.websocket.server.WebSocketServerFactory;
import org.eclipse.jetty.websocket.server.WebSocketServlet;
@ -51,7 +51,7 @@ public class WebSocketChatServlet extends WebSocketServlet implements WebSocketC
}
@Override
public Object createWebSocket(WebSocketRequest req, WebSocketResponse resp)
public Object createWebSocket(UpgradeRequest req, UpgradeResponse resp)
{
return new ChatWebSocket();
}