Cleaning up all of the old uncompilable websocket source

This commit is contained in:
Joakim Erdfelt 2012-07-03 11:31:16 -07:00
parent 7d4da60d05
commit d70174f159
9 changed files with 773 additions and 1720 deletions

View File

@ -1,600 +0,0 @@
/*******************************************************************************
* Copyright (c) 2011 Intalio, Inc.
* ======================================================================
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*******************************************************************************/
package org.eclipse.jetty.websocket.client;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ProtocolException;
import java.net.SocketAddress;
import java.net.URI;
import java.nio.channels.ByteChannel;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketConnection;
import org.eclipse.jetty.websocket.WebSocketConnectionRFC6455;
import org.eclipse.jetty.websocket.WebSocket.Connection;
import org.eclipse.jetty.websocket.WebSocket.FrameConnection;
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
import org.eclipse.jetty.websocket.masks.Masker;
/* ------------------------------------------------------------ */
/**
* <p>{@link OldWebSocketClient} allows to create multiple connections to multiple destinations
* that can speak the websocket protocol.</p>
* <p>When creating websocket connections, {@link OldWebSocketClient} accepts a {@link WebSocket}
* object (to receive events from the server), and returns a {@link WebSocket.Connection} to
* send data to the server.</p>
* <p>Example usage is as follows:</p>
* <pre>
* WebSocketClientFactory factory = new WebSocketClientFactory();
* factory.start();
*
* WebSocketClient client = factory.newWebSocketClient();
* // Configure the client
*
* WebSocket.Connection connection = client.open(new URI("ws://127.0.0.1:8080/"), new WebSocket.OnTextMessage()
* {
* public void onOpen(Connection connection)
* {
* // open notification
* }
*
* public void onClose(int closeCode, String message)
* {
* // close notification
* }
*
* public void onMessage(String data)
* {
* // handle incoming message
* }
* }).get(5, TimeUnit.SECONDS);
*
* connection.sendMessage("Hello World");
* </pre>
*/
public class OldWebSocketClient
{
private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(OldWebSocketClient.class.getName());
private final OldWebSocketClientFactory _factory;
private final Map<String,String> _cookies=new ConcurrentHashMap<String, String>();
private final List<String> _extensions=new CopyOnWriteArrayList<String>();
private String _origin;
private String _protocol;
private int _maxIdleTime=-1;
private int _maxTextMessageSize=16*1024;
private int _maxBinaryMessageSize=-1;
private Masker _maskGen;
private SocketAddress _bindAddress;
/* ------------------------------------------------------------ */
/**
* <p>Creates a WebSocketClient from a private WebSocketClientFactory.</p>
* <p>This can be wasteful of resources if many clients are created.</p>
*
* @deprecated Use {@link OldWebSocketClientFactory#newWebSocketClient()}
* @throws Exception if the private WebSocketClientFactory fails to start
*/
@Deprecated
public OldWebSocketClient() throws Exception
{
_factory=new OldWebSocketClientFactory();
_factory.start();
_maskGen=_factory.getMaskGen();
}
/* ------------------------------------------------------------ */
/**
* <p>Creates a WebSocketClient with shared WebSocketClientFactory.</p>
*
* @param factory the shared {@link OldWebSocketClientFactory}
*/
public OldWebSocketClient(OldWebSocketClientFactory factory)
{
_factory=factory;
_maskGen=_factory.getMaskGen();
}
/* ------------------------------------------------------------ */
/**
* @return The WebSocketClientFactory this client was created with.
*/
public OldWebSocketClientFactory getFactory()
{
return _factory;
}
/* ------------------------------------------------------------ */
/**
* @return the address to bind the socket channel to
* @see #setBindAddress(SocketAddress)
*/
public SocketAddress getBindAddress()
{
return _bindAddress;
}
/* ------------------------------------------------------------ */
/**
* @param bindAddress the address to bind the socket channel to
* @see #getBindAddress()
*/
public void setBindAddress(SocketAddress bindAddress)
{
this._bindAddress = bindAddress;
}
/* ------------------------------------------------------------ */
/**
* @return The maxIdleTime in ms for connections opened by this client,
* or -1 if the default from {@link OldWebSocketClientFactory#getSelectorManager()} is used.
* @see #setMaxIdleTime(int)
*/
public int getMaxIdleTime()
{
return _maxIdleTime;
}
/* ------------------------------------------------------------ */
/**
* @param maxIdleTime The max idle time in ms for connections opened by this client
* @see #getMaxIdleTime()
*/
public void setMaxIdleTime(int maxIdleTime)
{
_maxIdleTime=maxIdleTime;
}
/* ------------------------------------------------------------ */
/**
* @return The subprotocol string for connections opened by this client.
* @see #setProtocol(String)
*/
public String getProtocol()
{
return _protocol;
}
/* ------------------------------------------------------------ */
/**
* @param protocol The subprotocol string for connections opened by this client.
* @see #getProtocol()
*/
public void setProtocol(String protocol)
{
_protocol = protocol;
}
/* ------------------------------------------------------------ */
/**
* @return The origin URI of the client
* @see #setOrigin(String)
*/
public String getOrigin()
{
return _origin;
}
/* ------------------------------------------------------------ */
/**
* @param origin The origin URI of the client (eg "http://example.com")
* @see #getOrigin()
*/
public void setOrigin(String origin)
{
_origin = origin;
}
/* ------------------------------------------------------------ */
/**
* <p>Returns the map of the cookies that are sent during the initial HTTP handshake
* that upgrades to the websocket protocol.</p>
* @return The read-write cookie map
*/
public Map<String,String> getCookies()
{
return _cookies;
}
/* ------------------------------------------------------------ */
/**
* @return The list of websocket protocol extensions
*/
public List<String> getExtensions()
{
return _extensions;
}
/* ------------------------------------------------------------ */
/**
* @return the mask generator to use, or null if not mask generator should be used
* @see #setMaskGen(Masker)
*/
public Masker getMaskGen()
{
return _maskGen;
}
/* ------------------------------------------------------------ */
/**
* @param maskGen the mask generator to use, or null if not mask generator should be used
* @see #getMaskGen()
*/
public void setMaskGen(Masker maskGen)
{
_maskGen = maskGen;
}
/* ------------------------------------------------------------ */
/**
* @return The initial maximum text message size (in characters) for a connection
*/
public int getMaxTextMessageSize()
{
return _maxTextMessageSize;
}
/* ------------------------------------------------------------ */
/**
* Set the initial maximum text message size for a connection. This can be changed by
* the application calling {@link WebSocket.Connection#setMaxTextMessageSize(int)}.
* @param maxTextMessageSize The default maximum text message size (in characters) for a connection
*/
public void setMaxTextMessageSize(int maxTextMessageSize)
{
_maxTextMessageSize = maxTextMessageSize;
}
/* ------------------------------------------------------------ */
/**
* @return The initial maximum binary message size (in bytes) for a connection
*/
public int getMaxBinaryMessageSize()
{
return _maxBinaryMessageSize;
}
/* ------------------------------------------------------------ */
/**
* Set the initial maximum binary message size for a connection. This can be changed by
* the application calling {@link WebSocket.Connection#setMaxBinaryMessageSize(int)}.
* @param maxBinaryMessageSize The default maximum binary message size (in bytes) for a connection
*/
public void setMaxBinaryMessageSize(int maxBinaryMessageSize)
{
_maxBinaryMessageSize = maxBinaryMessageSize;
}
/* ------------------------------------------------------------ */
/**
* <p>Opens a websocket connection to the URI and blocks until the connection is accepted or there is an error.</p>
*
* @param uri The URI to connect to.
* @param websocket The {@link WebSocket} instance to handle incoming events.
* @param maxConnectTime The interval to wait for a successful connection
* @param units the units of the maxConnectTime
* @return A {@link WebSocket.Connection}
* @throws IOException if the connection fails
* @throws InterruptedException if the thread is interrupted
* @throws TimeoutException if the timeout elapses before the connection is completed
* @see #open(URI, WebSocket)
*/
public WebSocket.Connection open(URI uri, WebSocket websocket,long maxConnectTime,TimeUnit units) throws IOException, InterruptedException, TimeoutException
{
try
{
return open(uri,websocket).get(maxConnectTime,units);
}
catch (ExecutionException e)
{
Throwable cause = e.getCause();
if (cause instanceof IOException)
throw (IOException)cause;
if (cause instanceof Error)
throw (Error)cause;
if (cause instanceof RuntimeException)
throw (RuntimeException)cause;
throw new RuntimeException(cause);
}
}
/* ------------------------------------------------------------ */
/**
* <p>Asynchronously opens a websocket connection and returns a {@link Future} to obtain the connection.</p>
* <p>The caller must call {@link Future#get(long, TimeUnit)} if they wish to impose a connect timeout on the open.</p>
*
* @param uri The URI to connect to.
* @param websocket The {@link WebSocket} instance to handle incoming events.
* @return A {@link Future} to the {@link WebSocket.Connection}
* @throws IOException if the connection fails
* @see #open(URI, WebSocket, long, TimeUnit)
*/
public Future<WebSocket.Connection> open(URI uri, WebSocket websocket) throws IOException
{
if (!_factory.isStarted())
throw new IllegalStateException("Factory !started");
InetSocketAddress address = toSocketAddress(uri);
SocketChannel channel = SocketChannel.open();
if (_bindAddress != null)
channel.socket().bind(_bindAddress);
channel.socket().setTcpNoDelay(true);
WebSocketFuture holder = new WebSocketFuture(websocket, uri, this, channel);
channel.configureBlocking(false);
channel.connect(address);
_factory.getSelectorManager().register(channel, holder);
return holder;
}
public static InetSocketAddress toSocketAddress(URI uri)
{
String scheme = uri.getScheme();
if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
throw new IllegalArgumentException("Bad WebSocket scheme: " + scheme);
int port = uri.getPort();
if (port == 0)
throw new IllegalArgumentException("Bad WebSocket port: " + port);
if (port < 0)
port = "ws".equals(scheme) ? 80 : 443;
return new InetSocketAddress(uri.getHost(), port);
}
/* ------------------------------------------------------------ */
/** The Future Websocket Connection.
*/
static class WebSocketFuture implements Future<WebSocket.Connection>
{
final WebSocket _websocket;
final URI _uri;
final OldWebSocketClient _client;
final CountDownLatch _done = new CountDownLatch(1);
ByteChannel _channel;
WebSocketConnection _connection;
Throwable _exception;
private WebSocketFuture(WebSocket websocket, URI uri, OldWebSocketClient client, ByteChannel channel)
{
_websocket=websocket;
_uri=uri;
_client=client;
_channel=channel;
}
public void onConnection(WebSocketConnection connection)
{
try
{
_client.getFactory().addConnection(connection);
connection.getConnection().setMaxTextMessageSize(_client.getMaxTextMessageSize());
connection.getConnection().setMaxBinaryMessageSize(_client.getMaxBinaryMessageSize());
WebSocketConnection con;
synchronized (this)
{
if (_channel!=null)
_connection=connection;
con=_connection;
}
if (con!=null)
{
if (_websocket instanceof WebSocket.OnFrame)
((WebSocket.OnFrame)_websocket).onHandshake((WebSocket.FrameConnection)con.getConnection());
_websocket.onOpen(con.getConnection());
}
}
finally
{
_done.countDown();
}
}
public void handshakeFailed(Throwable ex)
{
try
{
ByteChannel channel=null;
synchronized (this)
{
if (_channel!=null)
{
channel=_channel;
_channel=null;
_exception=ex;
}
}
if (channel!=null)
{
if (ex instanceof ProtocolException)
closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_PROTOCOL,ex.getMessage());
else
closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,ex.getMessage());
}
}
finally
{
_done.countDown();
}
}
public Map<String,String> getCookies()
{
return _client.getCookies();
}
public String getProtocol()
{
return _client.getProtocol();
}
public WebSocket getWebSocket()
{
return _websocket;
}
public URI getURI()
{
return _uri;
}
public int getMaxIdleTime()
{
return _client.getMaxIdleTime();
}
public String getOrigin()
{
return _client.getOrigin();
}
public Masker getMaskGen()
{
return _client.getMaskGen();
}
@Override
public String toString()
{
return "[" + _uri + ","+_websocket+"]@"+hashCode();
}
public boolean cancel(boolean mayInterruptIfRunning)
{
try
{
ByteChannel channel=null;
synchronized (this)
{
if (_connection==null && _exception==null && _channel!=null)
{
channel=_channel;
_channel=null;
}
}
if (channel!=null)
{
closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"cancelled");
return true;
}
return false;
}
finally
{
_done.countDown();
}
}
public boolean isCancelled()
{
synchronized (this)
{
return _channel==null && _connection==null;
}
}
public boolean isDone()
{
synchronized (this)
{
return _connection!=null && _exception==null;
}
}
public org.eclipse.jetty.websocket.WebSocket.Connection get() throws InterruptedException, ExecutionException
{
try
{
return get(Long.MAX_VALUE,TimeUnit.SECONDS);
}
catch(TimeoutException e)
{
throw new IllegalStateException("The universe has ended",e);
}
}
public org.eclipse.jetty.websocket.WebSocket.Connection get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException
{
_done.await(timeout,unit);
ByteChannel channel=null;
org.eclipse.jetty.websocket.WebSocket.Connection connection=null;
Throwable exception;
synchronized (this)
{
exception=_exception;
if (_connection==null)
{
exception=_exception;
channel=_channel;
_channel=null;
}
else
connection=_connection.getConnection();
}
if (channel!=null)
closeChannel(channel,WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,"timeout");
if (exception!=null)
throw new ExecutionException(exception);
if (connection!=null)
return connection;
throw new TimeoutException();
}
private void closeChannel(ByteChannel channel,int code, String message)
{
try
{
_websocket.onClose(code,message);
}
catch(Exception e)
{
__log.warn(e);
}
try
{
channel.close();
}
catch(IOException e)
{
__log.debug(e);
}
}
}
}

View File

@ -1,426 +0,0 @@
/*******************************************************************************
* Copyright (c) 2011 Intalio, Inc.
* ======================================================================
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*******************************************************************************/
package org.eclipse.jetty.websocket.client;
import java.io.EOFException;
import java.io.IOException;
import java.net.ProtocolException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.sql.Connection;
import java.util.List;
import java.util.Map;
import java.util.Queue;
import java.util.Random;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.net.ssl.SSLEngine;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.SelectChannelEndPoint;
import org.eclipse.jetty.io.SelectorManager;
import org.eclipse.jetty.io.ssl.SslConnection;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.ssl.SslContextFactory;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.websocket.annotations.WebSocket;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.eclipse.jetty.websocket.extensions.Extension;
import org.eclipse.jetty.websocket.masks.Masker;
/* ------------------------------------------------------------ */
/**
* <p>WebSocketClientFactory contains the common components needed by multiple {@link OldWebSocketClient} instances
* (for example, a {@link ThreadPool}, a {@link SelectorManager NIO selector}, etc).</p>
* <p>WebSocketClients with different configurations should share the same factory to avoid to waste resources.</p>
* <p>If a ThreadPool or MaskGen is passed in the constructor, then it is not added with {@link AggregateLifeCycle#addBean(Object)},
* so it's lifecycle must be controlled externally.
*
* @see OldWebSocketClient
*/
public class OldWebSocketClientFactory extends AggregateLifeCycle
{
private final static Logger LOG = Log.getLogger(OldWebSocketClientFactory.class);
private final Queue<WebSocketConnection> connections = new ConcurrentLinkedQueue<WebSocketConnection>();
private WebSocketPolicy policy;
/**
* <p>Creates a WebSocketClientFactory with the default configuration.</p>
*/
public OldWebSocketClientFactory()
{
policy = WebSocketPolicy.newClientPolicy();
}
public WebSocketPolicy getPolicy()
{
return policy;
}
@Override
protected void doStop() throws Exception
{
closeConnections();
super.doStop();
}
/**
* <p>Creates and returns a new instance of a {@link OldWebSocketClient}, configured with this
* WebSocketClientFactory instance.</p>
*
* @return a new {@link OldWebSocketClient} instance
*/
public OldWebSocketClient newWebSocketClient()
{
return new OldWebSocketClient(this);
}
protected boolean addConnection(WebSocketConnection connection)
{
return isRunning() && connections.add(connection);
}
protected boolean removeConnection(WebSocketConnection connection)
{
return connections.remove(connection);
}
protected void closeConnections()
{
for (WebSocketConnection connection : connections)
{
try
{
connection.close();
}
catch (IOException e)
{
LOG.info(e);
}
}
}
/**
* WebSocket Client Selector Manager
*/
class WebSocketClientSelector extends SelectorManager
{
@Override
public boolean dispatch(Runnable task)
{
return _threadPool.dispatch(task);
}
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey key) throws IOException
{
OldWebSocketClient.WebSocketFuture holder = (OldWebSocketClient.WebSocketFuture)key.attachment();
int maxIdleTime = holder.getMaxIdleTime();
if (maxIdleTime < 0)
maxIdleTime = (int)getMaxIdleTime();
SelectChannelEndPoint result = new SelectChannelEndPoint(channel, selectSet, key, maxIdleTime);
AsyncEndPoint endPoint = result;
// Detect if it is SSL, and wrap the connection if so
if ("wss".equals(holder.getURI().getScheme()))
{
SSLEngine sslEngine = newSslEngine(channel);
SslConnection sslConnection = new SslConnection(sslEngine, endPoint);
endPoint.setConnection(sslConnection);
endPoint = sslConnection.getSslEndPoint();
}
AsyncConnection connection = selectSet.getManager().newConnection(channel, endPoint, holder);
endPoint.setConnection(connection);
return result;
}
@Override
public AsyncConnection newConnection(SocketChannel channel, AsyncEndPoint endpoint, Object attachment)
{
OldWebSocketClient.WebSocketFuture holder = (OldWebSocketClient.WebSocketFuture)attachment;
return new HandshakeConnection(endpoint, holder);
}
@Override
protected void endPointOpened(SelectChannelEndPoint endpoint)
{
// TODO expose on outer class ??
}
@Override
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
{
LOG.debug("upgrade {} -> {}", oldConnection, endpoint.getConnection());
}
@Override
protected void endPointClosed(SelectChannelEndPoint endpoint)
{
endpoint.getConnection().onClose();
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
{
if (!(attachment instanceof OldWebSocketClient.WebSocketFuture))
super.connectionFailed(channel, ex, attachment);
else
{
__log.debug(ex);
OldWebSocketClient.WebSocketFuture future = (OldWebSocketClient.WebSocketFuture)attachment;
future.handshakeFailed(ex);
}
}
}
/* ------------------------------------------------------------ */
/**
* Handshake Connection.
* Handles the connection until the handshake succeeds or fails.
*/
class HandshakeConnection extends AbstractConnection implements AsyncConnection
{
private final AsyncEndPoint _endp;
private final OldWebSocketClient.WebSocketFuture _future;
private final String _key;
private final HttpParser _parser;
private String _accept;
private String _error;
private ByteArrayBuffer _handshake;
public HandshakeConnection(AsyncEndPoint endpoint, OldWebSocketClient.WebSocketFuture future)
{
super(endpoint, System.currentTimeMillis());
_endp = endpoint;
_future = future;
byte[] bytes = new byte[16];
new Random().nextBytes(bytes);
_key = new String(B64Code.encode(bytes));
Buffers buffers = new SimpleBuffers(_buffers.getBuffer(), null);
_parser = new HttpParser(buffers, _endp, new HttpParser.EventHandler()
{
@Override
public void startResponse(ByteBuffer version, int status, ByteBuffer reason) throws IOException
{
if (status != 101)
{
_error = "Bad response status " + status + " " + reason;
_endp.close();
}
}
@Override
public void parsedHeader(ByteBuffer name, ByteBuffer value) throws IOException
{
if (__ACCEPT.equals(name))
_accept = value.toString();
}
@Override
public void startRequest(ByteBuffer method, ByteBuffer url, ByteBuffer version) throws IOException
{
if (_error == null)
_error = "Bad response: " + method + " " + url + " " + version;
_endp.close();
}
@Override
public void content(ByteBuffer ref) throws IOException
{
if (_error == null)
_error = "Bad response. " + ref.length() + "B of content?";
_endp.close();
}
});
}
private boolean handshake()
{
if (_handshake==null)
{
String path = _future.getURI().getPath();
if (path == null || path.length() == 0)
path = "/";
if (_future.getURI().getRawQuery() != null)
path += "?" + _future.getURI().getRawQuery();
String origin = _future.getOrigin();
StringBuilder request = new StringBuilder(512);
request.append("GET ").append(path).append(" HTTP/1.1\r\n")
.append("Host: ").append(_future.getURI().getHost()).append(":")
.append(_future.getURI().getPort()).append("\r\n")
.append("Upgrade: websocket\r\n")
.append("Connection: Upgrade\r\n")
.append("Sec-WebSocket-Key: ")
.append(_key).append("\r\n");
if (origin != null)
request.append("Origin: ").append(origin).append("\r\n");
request.append("Sec-WebSocket-Version: ").append(WebSocketConnectionRFC6455.VERSION).append("\r\n");
if (_future.getProtocol() != null)
request.append("Sec-WebSocket-Protocol: ").append(_future.getProtocol()).append("\r\n");
Map<String, String> cookies = _future.getCookies();
if (cookies != null && cookies.size() > 0)
{
for (String cookie : cookies.keySet())
request.append("Cookie: ")
.append(QuotedStringTokenizer.quoteIfNeeded(cookie, HttpFields.__COOKIE_DELIM))
.append("=")
.append(QuotedStringTokenizer.quoteIfNeeded(cookies.get(cookie), HttpFields.__COOKIE_DELIM))
.append("\r\n");
}
request.append("\r\n");
_handshake=new ByteArrayBuffer(request.toString(), false);
}
// TODO extensions
try
{
int len = _handshake.length();
int flushed = _endp.flush(_handshake);
if (flushed<0)
throw new IOException("incomplete handshake");
}
catch (IOException e)
{
_future.handshakeFailed(e);
}
return _handshake.length()==0;
}
public Connection handle() throws IOException
{
while (_endp.isOpen() && !_parser.isComplete())
{
if (_handshake==null || _handshake.length()>0)
if (!handshake())
return this;
if (!_parser.parseAvailable())
{
if (_endp.isInputShutdown())
_future.handshakeFailed(new IOException("Incomplete handshake response"));
return this;
}
}
if (_error == null)
{
if (_accept == null)
{
_error = "No Sec-WebSocket-Accept";
}
else if (!WebSocketConnectionRFC6455.hashKey(_key).equals(_accept))
{
_error = "Bad Sec-WebSocket-Accept";
}
else
{
WebSocketConnection connection = newWebSocketConnection();
ByteBuffer header = _parser.getHeaderBuffer();
if (header.hasContent())
connection.fillBuffersFrom(header);
_buffers.returnBuffer(header);
_future.onConnection(connection);
return connection;
}
}
_endp.close();
return this;
}
private WebSocketConnection newWebSocketConnection() throws IOException
{
return new WebSocketClientConnection(
_future._client.getFactory(),
_future.getWebSocket(),
_endp,
_buffers,
System.currentTimeMillis(),
_future.getMaxIdleTime(),
_future.getProtocol(),
null,
WebSocketConnectionRFC6455.VERSION,
_future.getMaskGen());
}
public void onInputShutdown() throws IOException
{
_endp.close();
}
public boolean isIdle()
{
return false;
}
public boolean isSuspended()
{
return false;
}
public void onClose()
{
if (_error != null)
_future.handshakeFailed(new ProtocolException(_error));
else
_future.handshakeFailed(new EOFException());
}
}
private static class WebSocketClientConnection extends WebSocketConnectionRFC6455
{
private final OldWebSocketClientFactory factory;
public WebSocketClientConnection(OldWebSocketClientFactory factory, WebSocket webSocket, EndPoint endPoint, WebSocketBuffers buffers, long timeStamp, int maxIdleTime, String protocol, List<Extension> extensions, int draftVersion, Masker maskGen) throws IOException
{
super(webSocket, endPoint, buffers, timeStamp, maxIdleTime, protocol, extensions, draftVersion, maskGen);
this.factory = factory;
}
@Override
public void onClose()
{
super.onClose();
factory.removeConnection(this);
}
}
}

View File

@ -74,9 +74,15 @@ public class WebSocketClient
}
}
private final WebSocketClientFactory factory;
public static InetSocketAddress toSocketAddress(URI uri)
{
// TODO Auto-generated method stub
return null;
}
private final WebSocketClientFactory factory;
private SocketAddress bindAddress;
private WebSocketPolicy policy;
public WebSocketClient(WebSocketClientFactory factory)
@ -134,4 +140,10 @@ public class WebSocketClient
{
this.bindAddress = bindAddress;
}
public void setProtocol(String protocol)
{
// TODO Auto-generated method stub
}
}

View File

@ -33,7 +33,7 @@ public class WebSocketClientFactory extends AggregateLifeCycle
public WebSocketClientFactory()
{
this(null,null);
this(new QueuedThreadPool(),null);
}
public WebSocketClientFactory(Executor threadPool)
@ -41,14 +41,14 @@ public class WebSocketClientFactory extends AggregateLifeCycle
this(threadPool,null);
}
public WebSocketClientFactory(Executor threadPool, SslContextFactory sslContextFactory)
public WebSocketClientFactory(Executor executor, SslContextFactory sslContextFactory)
{
if (threadPool == null)
if (executor == null)
{
threadPool = new QueuedThreadPool();
throw new IllegalArgumentException("Executor is required");
}
this.executor = threadPool;
addBean(threadPool);
this.executor = executor;
addBean(executor);
if (sslContextFactory != null)
{
@ -117,7 +117,7 @@ public class WebSocketClientFactory extends AggregateLifeCycle
return selector;
}
public WebSocketClient newSPDYClient()
public WebSocketClient newWebSocketClient()
{
return new WebSocketClient(this);
}

View File

@ -103,9 +103,8 @@ public class HandshakeConnection extends AbstractAsyncConnection implements Asyn
{
FutureCallback<ConnectFuture> callback = new FutureCallback<>();
getEndPoint().write(future,callback,buf);
// TODO: block on read?
// TODO: read response & upgrade via async callback
callback.get();
callback.get(); // TODO: block on read?
}
finally
{

View File

@ -1,316 +0,0 @@
/*******************************************************************************
* Copyright (c) 2011 Intalio, Inc.
* ======================================================================
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*******************************************************************************/
package org.eclipse.jetty.websocket.client;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.websocket.WebSocket;
import org.eclipse.jetty.websocket.WebSocketConnectionRFC6455;
import org.eclipse.jetty.websocket.WebSocket.Connection;
import org.eclipse.jetty.websocket.WebSocket.FrameConnection;
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
import org.eclipse.jetty.websocket.client.OldWebSocketClient;
import org.eclipse.jetty.websocket.client.OldWebSocketClientFactory;
/**
* This is not a general purpose websocket client.
* It's only for testing the websocket server and is hardwired to a specific draft version of the protocol.
*/
public class TestClient implements WebSocket.OnFrame
{
private static OldWebSocketClientFactory __clientFactory = new OldWebSocketClientFactory();
private static boolean _verbose=false;
private static final Random __random = new Random();
private final String _host;
private final int _port;
private final String _protocol;
private final int _timeout;
private static boolean __quiet;
private static int __framesSent;
private static int __messagesSent;
private static AtomicInteger __framesReceived=new AtomicInteger();
private static AtomicInteger __messagesReceived=new AtomicInteger();
private static AtomicLong __totalTime=new AtomicLong();
private static AtomicLong __minDuration=new AtomicLong(Long.MAX_VALUE);
private static AtomicLong __maxDuration=new AtomicLong(Long.MIN_VALUE);
private static long __start;
private BlockingQueue<Long> _starts = new LinkedBlockingQueue<Long>();
int _messageBytes;
int _frames;
byte _opcode=-1;
private volatile WebSocket.FrameConnection _connection;
private final CountDownLatch _handshook = new CountDownLatch(1);
public void onOpen(Connection connection)
{
if (_verbose)
System.err.printf("%s#onHandshake %s %s\n",this.getClass().getSimpleName(),connection,connection.getClass().getSimpleName());
}
public void onClose(int closeCode, String message)
{
_handshook.countDown();
}
public boolean onFrame(byte flags, byte opcode, byte[] data, int offset, int length)
{
try
{
if (_connection.isClose(opcode))
return false;
__framesReceived.incrementAndGet();
_frames++;
_messageBytes+=length;
if (_opcode==-1)
_opcode=opcode;
if (_connection.isControl(opcode) || _connection.isMessageComplete(flags))
{
int recv =__messagesReceived.incrementAndGet();
Long start=_starts.poll();
if (start!=null)
{
long duration = System.nanoTime()-start.longValue();
long max=__maxDuration.get();
while(duration>max && !__maxDuration.compareAndSet(max,duration))
max=__maxDuration.get();
long min=__minDuration.get();
while(duration<min && !__minDuration.compareAndSet(min,duration))
min=__minDuration.get();
__totalTime.addAndGet(duration);
if (!__quiet)
System.out.printf("%d bytes from %s: frames=%d req=%d time=%.1fms opcode=0x%s\n",_messageBytes,_host,_frames,recv,((double)duration/1000000.0),TypeUtil.toHexString(_opcode));
}
_frames=0;
_messageBytes=0;
_opcode=-1;
}
}
catch(Exception e)
{
e.printStackTrace();
}
return false;
}
public void onHandshake(FrameConnection connection)
{
_connection=connection;
_handshook.countDown();
}
public TestClient(String host, int port,String protocol, int timeoutMS) throws Exception
{
_host=host;
_port=port;
_protocol=protocol;
_timeout=timeoutMS;
}
private void open() throws Exception
{
OldWebSocketClient client = new OldWebSocketClient(__clientFactory);
client.setProtocol(_protocol);
client.setMaxIdleTime(_timeout);
client.open(new URI("ws://"+_host+":"+_port+"/"),this).get(10,TimeUnit.SECONDS);
}
public void ping(byte opcode,byte[] data,int fragment) throws Exception
{
_starts.add(System.nanoTime());
int off=0;
int len=data.length;
if (fragment>0&& len>fragment)
len=fragment;
__messagesSent++;
while(off<data.length)
{
__framesSent++;
byte flags= (byte)(off+len==data.length?0x8:0);
byte op=(byte)(off==0?opcode:WebSocketConnectionRFC6455.OP_CONTINUATION);
if (_verbose)
System.err.printf("%s#sendFrame %s|%s %s\n",this.getClass().getSimpleName(),TypeUtil.toHexString(flags),TypeUtil.toHexString(op),TypeUtil.toHexString(data,off,len));
_connection.sendFrame(flags,op,data,off,len);
off+=len;
if(data.length-off>len)
len=data.length-off;
if (fragment>0&& len>fragment)
len=fragment;
}
}
public void disconnect() throws Exception
{
if (_connection!=null)
{
_connection.close();
}
}
private static void usage(String[] args)
{
System.err.println("ERROR: "+Arrays.asList(args));
System.err.println("USAGE: java -cp CLASSPATH "+TestClient.class+" [ OPTIONS ]");
System.err.println(" -h|--host HOST (default localhost)");
System.err.println(" -p|--port PORT (default 8080)");
System.err.println(" -b|--binary");
System.err.println(" -v|--verbose");
System.err.println(" -q|--quiet");
System.err.println(" -c|--count n (default 10)");
System.err.println(" -s|--size n (default 64)");
System.err.println(" -f|--fragment n (default 4000) ");
System.err.println(" -P|--protocol echo|echo-assemble|echo-fragment|echo-broadcast");
System.err.println(" -C|--clients n (default 1) ");
System.err.println(" -d|--delay n (default 1000ms) ");
System.exit(1);
}
public static void main(String[] args) throws Exception
{
__clientFactory.start();
String host="localhost";
int port=8080;
String protocol=null;
int count=10;
int size=64;
int fragment=4000;
boolean binary=false;
int clients=1;
int delay=1000;
for (int i=0;i<args.length;i++)
{
String a=args[i];
if ("-p".equals(a)||"--port".equals(a))
port=Integer.parseInt(args[++i]);
else if ("-h".equals(a)||"--host".equals(a))
host=args[++i];
else if ("-c".equals(a)||"--count".equals(a))
count=Integer.parseInt(args[++i]);
else if ("-s".equals(a)||"--size".equals(a))
size=Integer.parseInt(args[++i]);
else if ("-f".equals(a)||"--fragment".equals(a))
fragment=Integer.parseInt(args[++i]);
else if ("-P".equals(a)||"--protocol".equals(a))
protocol=args[++i];
else if ("-v".equals(a)||"--verbose".equals(a))
_verbose=true;
else if ("-b".equals(a)||"--binary".equals(a))
binary=true;
else if ("-C".equals(a)||"--clients".equals(a))
clients=Integer.parseInt(args[++i]);
else if ("-d".equals(a)||"--delay".equals(a))
delay=Integer.parseInt(args[++i]);
else if ("-q".equals(a)||"--quiet".equals(a))
__quiet=true;
else if (a.startsWith("-"))
usage(args);
}
TestClient[] client = new TestClient[clients];
try
{
__start=System.currentTimeMillis();
protocol=protocol==null?"echo":protocol;
for (int i=0;i<clients;i++)
{
client[i]=new TestClient(host,port,protocol==null?null:protocol,60000);
client[i].open();
}
System.out.println("Jetty WebSocket PING "+host+":"+port+
" ("+ new InetSocketAddress(host,port)+") "+clients+" clients "+protocol);
for (int p=0;p<count;p++)
{
long next = System.currentTimeMillis()+delay;
byte opcode=binary?WebSocketConnectionRFC6455.OP_BINARY:WebSocketConnectionRFC6455.OP_TEXT;
byte data[]=null;
if (opcode==WebSocketConnectionRFC6455.OP_TEXT)
{
StringBuilder b = new StringBuilder();
while (b.length()<size)
b.append('A'+__random.nextInt(26));
data=b.toString().getBytes(StringUtil.__UTF8);
}
else
{
data= new byte[size];
__random.nextBytes(data);
}
for (int i=0;i<clients;i++)
client[i].ping(opcode,data,opcode==WebSocketConnectionRFC6455.OP_PING?-1:fragment);
while(System.currentTimeMillis()<next)
Thread.sleep(10);
}
}
finally
{
for (int i=0;i<clients;i++)
if (client[i]!=null)
client[i].disconnect();
long duration=System.currentTimeMillis()-__start;
System.out.println("--- "+host+" websocket ping statistics using "+clients+" connection"+(clients>1?"s":"")+" ---");
System.out.printf("%d/%d frames sent/recv, %d/%d mesg sent/recv, time %dms %dm/s %.2fbps%n",
__framesSent,__framesReceived.get(),
__messagesSent,__messagesReceived.get(),
duration,(1000L*__messagesReceived.get()/duration),
1000.0D*__messagesReceived.get()*8*size/duration/1024/1024);
System.out.printf("rtt min/ave/max = %.3f/%.3f/%.3f ms\n",__minDuration.get()/1000000.0,__messagesReceived.get()==0?0.0:(__totalTime.get()/__messagesReceived.get()/1000000.0),__maxDuration.get()/1000000.0);
__clientFactory.stop();
}
}
}

View File

@ -15,16 +15,12 @@
*******************************************************************************/
package org.eclipse.jetty.websocket.client;
import java.io.BufferedReader;
import static org.hamcrest.Matchers.*;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.sql.Connection;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
@ -37,10 +33,12 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.api.WebSocketListener;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer;
import org.eclipse.jetty.websocket.masks.ZeroMasker;
import org.eclipse.jetty.websocket.client.blockhead.BlockheadServer.ServerConnection;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -48,63 +46,84 @@ import org.junit.Test;
public class WebSocketClientTest
{
public static class TrackingSocket extends WebSocketAdapter
{
public AtomicBoolean open = new AtomicBoolean(false);
public AtomicInteger close = new AtomicInteger(-1);
public StringBuilder closeMessage = new StringBuilder();
public CountDownLatch openLatch = new CountDownLatch(1);
public CountDownLatch closeLatch = new CountDownLatch(1);
public CountDownLatch dataLatch = new CountDownLatch(1);
public BlockingQueue<String> messageQueue = new BlockingArrayQueue<String>();
public void assertClose(int expectedStatusCode, String expectedReason)
{
assertCloseCode(expectedStatusCode);
assertCloseReason(expectedReason);
}
public void assertCloseCode(int expectedCode)
{
Assert.assertThat("Close Code",close.get(),is(expectedCode));
}
private void assertCloseReason(String expectedReason)
{
Assert.assertThat("Close Reaosn",closeMessage.toString(),is(expectedReason));
}
public void assertIsOpen()
{
assertWasOpened();
assertNotClosed();
}
public void assertNotClosed()
{
Assert.assertThat("Close Code",close.get(),is(-1));
}
public void assertNotOpened()
{
Assert.assertThat("Opened State",open.get(),is(false));
}
public void assertWasOpened()
{
Assert.assertThat("Opened State",open.get(),is(true));
}
@Override
public void onWebSocketBinary(byte[] payload, int offset, int len)
{
dataLatch.countDown();
}
@Override
public void onWebSocketClose(int statusCode, String reason)
{
close.set(statusCode);
closeMessage.append(reason);
closeLatch.countDown();
}
@Override
public void onWebSocketConnect(WebSocketConnection connection)
{
open.set(true);
openLatch.countDown();
}
@Override
public void onWebSocketText(String message)
{
dataLatch.countDown();
messageQueue.add(message);
}
}
private BlockheadServer server;
private void accept(Socket connection) throws IOException
{
String key = "not sent";
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
for (String line = in.readLine(); line != null; line = in.readLine())
{
if (line.length() == 0)
{
break;
}
if (line.startsWith("Sec-WebSocket-Key:"))
{
key = line.substring(18).trim();
}
}
connection.getOutputStream().write(
("HTTP/1.1 101 Upgrade\r\n" + "Sec-WebSocket-Accept: " + WebSocketConnectionRFC6455.hashKey(key) + "\r\n" + "\r\n").getBytes());
}
private void respondToClient(Socket connection, String serverResponse) throws IOException
{
InputStream in = null;
InputStreamReader isr = null;
BufferedReader buf = null;
OutputStream out = null;
try
{
in = connection.getInputStream();
isr = new InputStreamReader(in);
buf = new BufferedReader(isr);
String line;
while ((line = buf.readLine()) != null)
{
// System.err.println(line);
if (line.length() == 0)
{
// Got the "\r\n" line.
break;
}
}
// System.out.println("[Server-Out] " + serverResponse);
out = connection.getOutputStream();
out.write(serverResponse.getBytes());
out.flush();
}
finally
{
IO.close(buf);
IO.close(isr);
IO.close(in);
IO.close(out);
}
}
@Before
public void startServer() throws Exception
{
@ -121,63 +140,44 @@ public class WebSocketClientTest
@Test
public void testAsyncConnectionRefused() throws Exception
{
OldWebSocketClient client = new OldWebSocketClient(_factory);
WebSocketClientFactory factory = new WebSocketClientFactory();
WebSocketClient client = factory.newWebSocketClient();
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
TrackingSocket wsocket = new TrackingSocket();
Future<WebSocket.Connection> future = client.open(new URI("ws://127.0.0.1:1"),new WebSocket()
{
public void onClose(int closeCode, String message)
{
close.set(closeCode);
}
public void onOpen(Connection connection)
{
open.set(true);
}
});
// Intentionally bad port
URI wsUri = new URI("ws://127.0.0.1:1");
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
Throwable error = null;
try
{
future.get(1,TimeUnit.SECONDS);
Assert.fail();
Assert.fail("Expected ExecutionException");
}
catch (ExecutionException e)
{
error = e.getCause();
}
Assert.assertFalse(open.get());
Assert.assertEquals(WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,close.get());
wsocket.assertNotOpened();
wsocket.assertCloseCode(StatusCode.NO_CLOSE);
Assert.assertTrue(error instanceof ConnectException);
}
@Test
public void testBadHandshake() throws Exception
{
OldWebSocketClient client = new OldWebSocketClient(_factory);
WebSocketClientFactory factory = new WebSocketClientFactory();
WebSocketClient client = factory.newWebSocketClient();
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
Future<WebSocket.Connection> future = client.open(new URI("ws://127.0.0.1:" + _serverPort + "/"),new WebSocket()
{
public void onClose(int closeCode, String message)
{
close.set(closeCode);
}
TrackingSocket wsocket = new TrackingSocket();
public void onOpen(Connection connection)
{
open.set(true);
}
});
URI wsUri = new URI("ws://127.0.0.1:" + server.getPort() + "/");
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
Socket connection = _server.accept();
respondToClient(connection,"HTTP/1.1 404 NOT FOUND\r\n\r\n");
ServerConnection connection = server.accept();
connection.respond("HTTP/1.1 404 NOT FOUND\r\n\r\n");
Throwable error = null;
try
@ -190,8 +190,8 @@ public class WebSocketClientTest
error = e.getCause();
}
Assert.assertFalse(open.get());
Assert.assertEquals(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,close.get());
wsocket.assertNotOpened();
wsocket.assertCloseCode(StatusCode.PROTOCOL);
Assert.assertTrue(error instanceof IOException);
Assert.assertTrue(error.getMessage().indexOf("404 NOT FOUND") > 0);
@ -200,25 +200,16 @@ public class WebSocketClientTest
@Test
public void testBadUpgrade() throws Exception
{
OldWebSocketClient client = new OldWebSocketClient(_factory);
WebSocketClientFactory factory = new WebSocketClientFactory();
WebSocketClient client = factory.newWebSocketClient();
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
Future<WebSocket.Connection> future = client.open(new URI("ws://127.0.0.1:" + _serverPort + "/"),new WebSocket()
{
public void onClose(int closeCode, String message)
{
close.set(closeCode);
}
TrackingSocket wsocket = new TrackingSocket();
public void onOpen(Connection connection)
{
open.set(true);
}
});
URI wsUri = new URI("ws://127.0.0.1:" + server.getPort() + "/");
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
Socket connection = _server.accept();
respondToClient(connection,"HTTP/1.1 101 Upgrade\r\n" + "Sec-WebSocket-Accept: rubbish\r\n" + "\r\n");
ServerConnection connection = server.accept();
connection.respond("HTTP/1.1 101 Upgrade\r\n" + "Sec-WebSocket-Accept: rubbish\r\n" + "\r\n");
Throwable error = null;
try
@ -230,86 +221,86 @@ public class WebSocketClientTest
{
error = e.getCause();
}
Assert.assertFalse(open.get());
Assert.assertEquals(WebSocketConnectionRFC6455.CLOSE_PROTOCOL,close.get());
wsocket.assertNotOpened();
wsocket.assertCloseCode(StatusCode.PROTOCOL);
Assert.assertTrue(error instanceof IOException);
Assert.assertTrue(error.getMessage().indexOf("Bad Sec-WebSocket-Accept") >= 0);
Assert.assertThat("Error Message",error.getMessage(),containsString("Bad Sec-WebSocket-Accept"));
}
@Test
public void testBadURL() throws Exception
{
OldWebSocketClient client = new OldWebSocketClient(_factory);
WebSocketClientFactory factory = new WebSocketClientFactory();
WebSocketClient client = factory.newWebSocketClient();
boolean bad = false;
final AtomicBoolean open = new AtomicBoolean();
TrackingSocket wsocket = new TrackingSocket();
try
{
client.open(new URI("http://localhost:8080"),new WebSocket()
{
public void onClose(int closeCode, String message)
{
}
// Intentionally bad scheme in URI
URI wsUri = new URI("http://localhost:8080");
public void onOpen(Connection connection)
{
open.set(true);
}
});
client.connect(wsUri,wsocket); // should toss exception
Assert.fail();
Assert.fail("Expected IllegalArgumentException");
}
catch (IllegalArgumentException e)
{
bad = true;
// expected path
wsocket.assertNotOpened();
}
Assert.assertTrue(bad);
Assert.assertFalse(open.get());
}
@Test
public void testBlockReceiving() throws Exception
{
OldWebSocketClient client = new OldWebSocketClient(_factory);
client.setMaxIdleTime(60000);
WebSocketClientFactory factory = new WebSocketClientFactory();
WebSocketClient client = factory.newWebSocketClient();
client.getPolicy().setMaxIdleTime(60000);
final AtomicBoolean open = new AtomicBoolean();
final AtomicBoolean open = new AtomicBoolean(false);
final AtomicInteger close = new AtomicInteger();
final CountDownLatch _latch = new CountDownLatch(1);
final StringBuilder closeMessage = new StringBuilder();
final Exchanger<String> exchanger = new Exchanger<String>();
Future<WebSocket.Connection> future = client.open(new URI("ws://127.0.0.1:" + _serverPort + "/"),new WebSocket.OnTextMessage()
WebSocketListener socket = new WebSocketAdapter()
{
public void onClose(int closeCode, String message)
@Override
public void onWebSocketClose(int statusCode, String reason)
{
close.set(closeCode);
closeMessage.append(message);
close.set(statusCode);
closeMessage.append(reason);
_latch.countDown();
}
public void onMessage(String data)
@Override
public void onWebSocketConnect(WebSocketConnection connection)
{
open.set(true);
}
@Override
public void onWebSocketText(String message)
{
try
{
exchanger.exchange(data);
exchanger.exchange(message);
}
catch (InterruptedException e)
{
// e.printStackTrace();
}
}
};
public void onOpen(Connection connection)
{
open.set(true);
}
});
URI wsUri = new URI("ws://127.0.0.1:" + server.getPort() + "/");
Future<WebSocketConnection> future = client.connect(wsUri,socket);
Socket socket = _server.accept();
socket.setSoTimeout(60000);
accept(socket);
ServerConnection sconnection = server.accept();
sconnection.setSoTimeout(60000);
WebSocket.Connection connection = future.get(250,TimeUnit.MILLISECONDS);
WebSocketConnection connection = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertNotNull(connection);
Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get());
@ -362,8 +353,8 @@ public class WebSocketClientTest
long start = System.currentTimeMillis();
for (int i = 0; i < messages; i++)
{
socket.getOutputStream().write(send,0,send.length);
socket.getOutputStream().flush();
sconnection.write(send,0,send.length);
sconnection.flush();
}
while (consumer.isAlive())
@ -379,9 +370,9 @@ public class WebSocketClientTest
// Close with code
start = System.currentTimeMillis();
socket.getOutputStream().write(new byte[]
sconnection.write(new byte[]
{ (byte)0x88, (byte)0x02, (byte)4, (byte)87 },0,4);
socket.getOutputStream().flush();
sconnection.flush();
_latch.await(10,TimeUnit.SECONDS);
Assert.assertTrue((System.currentTimeMillis() - start) < 5000);
@ -392,37 +383,22 @@ public class WebSocketClientTest
@Test
public void testBlockSending() throws Exception
{
OldWebSocketClient client = new OldWebSocketClient(_factory);
client.setMaxIdleTime(10000);
WebSocketClientFactory factory = new WebSocketClientFactory();
WebSocketClient client = factory.newWebSocketClient();
client.getPolicy().setMaxIdleTime(10000);
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
final CountDownLatch _latch = new CountDownLatch(1);
Future<WebSocket.Connection> future = client.open(new URI("ws://127.0.0.1:" + _serverPort + "/"),new WebSocket.OnTextMessage()
{
public void onClose(int closeCode, String message)
{
close.set(closeCode);
_latch.countDown();
}
TrackingSocket wsocket = new TrackingSocket();
public void onMessage(String data)
{
}
URI wsUri = new URI("ws://127.0.0.1:" + server.getPort() + "/");
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
public void onOpen(Connection connection)
{
open.set(true);
}
});
final ServerConnection ssocket = server.accept();
ssocket.upgrade();
final Socket socket = _server.accept();
accept(socket);
WebSocket.Connection connection = future.get(250,TimeUnit.MILLISECONDS);
WebSocketConnection connection = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertNotNull(connection);
Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get());
wsocket.assertWasOpened();
wsocket.assertNotClosed();
final int messages = 200000;
final AtomicLong totalB = new AtomicLong();
@ -442,7 +418,7 @@ public class WebSocketClientTest
while (len >= 0)
{
totalB.addAndGet(len);
len = socket.getInputStream().read(recv,0,recv.length);
len = ssocket.getInputStream().read(recv,0,recv.length);
Thread.sleep(10);
}
}
@ -463,7 +439,7 @@ public class WebSocketClientTest
String mesg = "This is a test message to send";
for (int i = 0; i < messages; i++)
{
connection.sendMessage(mesg);
connection.write(mesg);
}
// Duration for the write phase
@ -484,210 +460,136 @@ public class WebSocketClientTest
@Test
public void testConnectionNotAccepted() throws Exception
{
OldWebSocketClient client = new OldWebSocketClient(_factory);
WebSocketClientFactory factory = new WebSocketClientFactory();
WebSocketClient client = factory.newWebSocketClient();
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
Future<WebSocket.Connection> future = client.open(new URI("ws://127.0.0.1:" + _serverPort),new WebSocket()
{
public void onClose(int closeCode, String message)
{
close.set(closeCode);
}
TrackingSocket wsocket = new TrackingSocket();
public void onOpen(Connection connection)
{
open.set(true);
}
});
URI wsUri = new URI("ws://127.0.0.1:" + server.getPort() + "/");
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
// Intentionally not accept incoming socket.
// server.accept();
Throwable error = null;
try
{
future.get(250,TimeUnit.MILLISECONDS);
Assert.fail();
Assert.fail("Should have Timed Out");
}
catch (TimeoutException e)
{
error = e;
// Expected Path
wsocket.assertNotOpened();
wsocket.assertCloseCode(StatusCode.NO_CLOSE);
}
Assert.assertFalse(open.get());
Assert.assertEquals(WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,close.get());
Assert.assertTrue(error instanceof TimeoutException);
}
@Test
public void testConnectionTimeout() throws Exception
{
OldWebSocketClient client = new OldWebSocketClient(_factory);
WebSocketClientFactory factory = new WebSocketClientFactory();
WebSocketClient client = factory.newWebSocketClient();
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
Future<WebSocket.Connection> future = client.open(new URI("ws://127.0.0.1:" + _serverPort),new WebSocket()
{
public void onClose(int closeCode, String message)
{
close.set(closeCode);
}
TrackingSocket wsocket = new TrackingSocket();
public void onOpen(Connection connection)
{
open.set(true);
}
});
URI wsUri = new URI("ws://127.0.0.1:" + server.getPort() + "/");
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
Assert.assertNotNull(_server.accept());
ServerConnection ssocket = server.accept();
Assert.assertNotNull(ssocket);
// Intentionally don't upgrade
// ssocket.upgrade();
Throwable error = null;
try
{
future.get(250,TimeUnit.MILLISECONDS);
Assert.fail();
Assert.fail("Expected Timeout Exception");
}
catch (TimeoutException e)
{
error = e;
// Expected path
wsocket.assertNotOpened();
wsocket.assertCloseCode(StatusCode.NO_CLOSE);
}
Assert.assertFalse(open.get());
Assert.assertEquals(WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,close.get());
Assert.assertTrue(error instanceof TimeoutException);
}
@Test
public void testIdle() throws Exception
{
OldWebSocketClient client = new OldWebSocketClient(_factory);
client.setMaxIdleTime(500);
WebSocketClientFactory factory = new WebSocketClientFactory();
WebSocketClient client = factory.newWebSocketClient();
client.getPolicy().setMaxIdleTime(500);
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
final CountDownLatch _latch = new CountDownLatch(1);
Future<WebSocket.Connection> future = client.open(new URI("ws://127.0.0.1:" + _serverPort + "/"),new WebSocket()
{
public void onClose(int closeCode, String message)
{
close.set(closeCode);
_latch.countDown();
}
TrackingSocket wsocket = new TrackingSocket();
public void onOpen(Connection connection)
{
open.set(true);
}
});
URI wsUri = new URI("ws://127.0.0.1:" + server.getPort() + "/");
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
Socket socket = _server.accept();
accept(socket);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
WebSocket.Connection connection = future.get(250,TimeUnit.MILLISECONDS);
WebSocketConnection connection = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertNotNull(connection);
Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get());
wsocket.assertWasOpened();
wsocket.assertNotClosed();
long start = System.currentTimeMillis();
_latch.await(10,TimeUnit.SECONDS);
wsocket.closeLatch.await(10,TimeUnit.SECONDS);
Assert.assertTrue((System.currentTimeMillis() - start) < 5000);
Assert.assertEquals(WebSocketConnectionRFC6455.CLOSE_NORMAL,close.get());
wsocket.assertCloseCode(StatusCode.NORMAL);
}
@Test
public void testMessageBiggerThanBufferSize() throws Exception
{
OldWebSocketClientFactory factory = new OldWebSocketClientFactory();
QueuedThreadPool threadPool = new QueuedThreadPool();
WebSocketClientFactory factory = new WebSocketClientFactory();
int bufferSize = 512;
OldWebSocketClientFactory factory = new OldWebSocketClientFactory(threadPool,new ZeroMasker(),bufferSize);
threadPool.start();
factory.start();
OldWebSocketClient client = new OldWebSocketClient(factory);
factory.getPolicy().setBufferSize(512);
WebSocketClient client = factory.newWebSocketClient();
final CountDownLatch openLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
WebSocket.OnTextMessage websocket = new WebSocket.OnTextMessage()
{
public void onClose(int closeCode, String message)
{
}
TrackingSocket wsocket = new TrackingSocket();
public void onMessage(String data)
{
// System.out.println("data = " + data);
dataLatch.countDown();
}
URI wsUri = new URI("ws://127.0.0.1:" + server.getPort() + "/");
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
public void onOpen(Connection connection)
{
openLatch.countDown();
}
};
client.open(new URI("ws://127.0.0.1:" + _serverPort + "/"),websocket);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
Socket socket = _server.accept();
accept(socket);
Assert.assertTrue(wsocket.openLatch.await(1,TimeUnit.SECONDS));
Assert.assertTrue(openLatch.await(1,TimeUnit.SECONDS));
OutputStream serverOutput = socket.getOutputStream();
int length = bufferSize + (bufferSize / 2);
serverOutput.write(0x80 | 0x01); // FIN + TEXT
serverOutput.write(0x7E); // No MASK and 2 bytes length
serverOutput.write(length >> 8); // first length byte
serverOutput.write(length & 0xFF); // second length byte
int length = bufferSize + (bufferSize / 2); // 1.5 times buffer size
ssocket.write(0x80 | 0x01); // FIN + TEXT
ssocket.write(0x7E); // No MASK and 2 bytes length
ssocket.write(length >> 8); // first length byte
ssocket.write(length & 0xFF); // second length byte
for (int i = 0; i < length; ++i)
{
serverOutput.write('x');
ssocket.write('x');
}
serverOutput.flush();
ssocket.flush();
Assert.assertTrue(dataLatch.await(1000,TimeUnit.SECONDS));
factory.stop();
threadPool.stop();
Assert.assertTrue(wsocket.dataLatch.await(1000,TimeUnit.SECONDS));
}
@Test
public void testNotIdle() throws Exception
{
OldWebSocketClient client = new OldWebSocketClient(_factory);
client.setMaxIdleTime(500);
WebSocketClientFactory factory = new WebSocketClientFactory();
WebSocketClient client = factory.newWebSocketClient();
client.getPolicy().setMaxIdleTime(500);
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
final CountDownLatch _latch = new CountDownLatch(1);
final BlockingQueue<String> queue = new BlockingArrayQueue<String>();
final StringBuilder closeMessage = new StringBuilder();
Future<WebSocket.Connection> future = client.open(new URI("ws://127.0.0.1:" + _serverPort + "/"),new WebSocket.OnTextMessage()
{
public void onClose(int closeCode, String message)
{
close.set(closeCode);
closeMessage.append(message);
_latch.countDown();
}
TrackingSocket wsocket = new TrackingSocket();
public void onMessage(String data)
{
queue.add(data);
}
URI wsUri = new URI("ws://127.0.0.1:" + server.getPort() + "/");
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
public void onOpen(Connection connection)
{
open.set(true);
}
});
ServerConnection ssocket = server.accept();
ssocket.upgrade();
Socket socket = _server.accept();
accept(socket);
WebSocket.Connection connection = future.get(250,TimeUnit.MILLISECONDS);
WebSocketConnection connection = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertNotNull(connection);
Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get());
wsocket.assertIsOpen();
// Send some messages client to server
byte[] recv = new byte[1024];
@ -695,8 +597,8 @@ public class WebSocketClientTest
for (int i = 0; i < 10; i++)
{
Thread.sleep(250);
connection.sendMessage("Hello");
len = socket.getInputStream().read(recv,0,recv.length);
connection.write("Hello");
len = ssocket.getInputStream().read(recv,0,recv.length);
Assert.assertTrue(len > 0);
}
@ -707,65 +609,53 @@ public class WebSocketClientTest
for (int i = 0; i < 10; i++)
{
Thread.sleep(250);
socket.getOutputStream().write(send,0,send.length);
socket.getOutputStream().flush();
Assert.assertEquals("Hi",queue.poll(1,TimeUnit.SECONDS));
ssocket.write(send,0,send.length);
ssocket.flush();
Assert.assertEquals("Hi",wsocket.messageQueue.poll(1,TimeUnit.SECONDS));
}
// Close with code
long start = System.currentTimeMillis();
socket.getOutputStream().write(new byte[]
ssocket.write(new byte[]
{ (byte)0x88, (byte)0x02, (byte)4, (byte)87 },0,4);
socket.getOutputStream().flush();
ssocket.flush();
_latch.await(10,TimeUnit.SECONDS);
wsocket.closeLatch.await(10,TimeUnit.SECONDS);
Assert.assertTrue((System.currentTimeMillis() - start) < 5000);
Assert.assertEquals(1002,close.get());
Assert.assertEquals("Invalid close code 1111",closeMessage.toString());
wsocket.assertClose(StatusCode.PROTOCOL,"Invalid close code 1111");
}
@Test
public void testUpgradeThenTCPClose() throws Exception
{
OldWebSocketClient client = new OldWebSocketClient(_factory);
WebSocketClientFactory factory = new WebSocketClientFactory();
WebSocketClient client = factory.newWebSocketClient();
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
final CountDownLatch _latch = new CountDownLatch(1);
Future<WebSocket.Connection> future = client.open(new URI("ws://127.0.0.1:" + _serverPort + "/"),new WebSocket()
{
public void onClose(int closeCode, String message)
{
close.set(closeCode);
_latch.countDown();
}
TrackingSocket wsocket = new TrackingSocket();
public void onOpen(Connection connection)
{
open.set(true);
}
});
URI wsUri = new URI("ws://127.0.0.1:" + server.getPort() + "/");
Future<WebSocketConnection> future = client.connect(wsUri,wsocket);
Socket socket = _server.accept();
accept(socket);
ServerConnection ssocket = server.accept();
ssocket.upgrade();
WebSocket.Connection connection = future.get(250,TimeUnit.MILLISECONDS);
WebSocketConnection connection = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertNotNull(connection);
Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get());
socket.close();
_latch.await(10,TimeUnit.SECONDS);
wsocket.assertIsOpen();
Assert.assertEquals(WebSocketConnectionRFC6455.CLOSE_NO_CLOSE,close.get());
ssocket.close();
wsocket.openLatch.await(10,TimeUnit.SECONDS);
wsocket.assertCloseCode(StatusCode.NO_CLOSE);
}
@Test
public void testURIWithDefaultPort() throws Exception
{
URI uri = new URI("ws://localhost");
InetSocketAddress addr = OldWebSocketClient.toSocketAddress(uri);
InetSocketAddress addr = WebSocketClient.toSocketAddress(uri);
Assert.assertThat("URI (" + uri + ").host",addr.getHostName(),is("localhost"));
Assert.assertThat("URI (" + uri + ").port",addr.getPort(),is(80));
}
@ -774,7 +664,7 @@ public class WebSocketClientTest
public void testURIWithDefaultWSSPort() throws Exception
{
URI uri = new URI("wss://localhost");
InetSocketAddress addr = OldWebSocketClient.toSocketAddress(uri);
InetSocketAddress addr = WebSocketClient.toSocketAddress(uri);
Assert.assertThat("URI (" + uri + ").host",addr.getHostName(),is("localhost"));
Assert.assertThat("URI (" + uri + ").port",addr.getPort(),is(443));
}

View File

@ -1,24 +1,183 @@
package org.eclipse.jetty.websocket.client.blockhead;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.websocket.api.AcceptHash;
/**
* A overly simplistic websocket server used during testing.
* <p>
* This is not meant to be performant or accurate.
* In fact, having the server misbehave is a useful trait during testing.
* This is not meant to be performant or accurate. In fact, having the server misbehave is a useful trait during testing.
*/
public class BlockheadServer
{
public static class ServerConnection
{
public void start()
public void close()
{
// TODO Auto-generated method stub
}
public void flush()
{
// TODO Auto-generated method stub
}
public InputStream getInputStream()
{
// TODO Auto-generated method stub
return null;
}
public void respond(String rawstr)
{
// TODO Auto-generated method stub
}
public void setSoTimeout(int ms)
{
// TODO Auto-generated method stub
}
public void upgrade() throws IOException
{
String key = "not sent";
BufferedReader in = new BufferedReader(new InputStreamReader(getInputStream()));
for (String line = in.readLine(); line != null; line = in.readLine())
{
if (line.length() == 0)
{
break;
}
if (line.startsWith("Sec-WebSocket-Key:"))
{
key = line.substring(18).trim();
}
}
StringBuilder resp = new StringBuilder();
resp.append("HTTP/1.1 101 Upgrade\r\n");
resp.append("Sec-WebSocket-Accept: ");
resp.append(AcceptHash.hashKey(key));
resp.append("\r\n");
resp.append("\r\n");
write(resp.toString().getBytes());
}
private void write(byte[] bytes)
{
// TODO Auto-generated method stub
}
public void write(byte[] buf, int offset, int length)
{
// TODO Auto-generated method stub
}
public void write(int b)
{
// TODO Auto-generated method stub
}
}
public ServerConnection accept()
{
// TODO Auto-generated method stub
return null;
}
public void accept(Socket connection) throws IOException
{
String key = "not sent";
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
for (String line = in.readLine(); line != null; line = in.readLine())
{
if (line.length() == 0)
{
break;
}
if (line.startsWith("Sec-WebSocket-Key:"))
{
key = line.substring(18).trim();
}
}
StringBuilder resp = new StringBuilder();
resp.append("HTTP/1.1 101 Upgrade\r\n");
resp.append("Sec-WebSocket-Accept: ");
resp.append(AcceptHash.hashKey(key));
resp.append("\r\n");
resp.append("\r\n");
connection.getOutputStream().write(resp.toString().getBytes());
}
public void close()
{
// TODO Auto-generated method stub
}
public int getPort()
{
// TODO Auto-generated method stub
return -1;
}
public void respondToClient(Socket connection, String serverResponse) throws IOException
{
InputStream in = null;
InputStreamReader isr = null;
BufferedReader buf = null;
OutputStream out = null;
try
{
in = connection.getInputStream();
isr = new InputStreamReader(in);
buf = new BufferedReader(isr);
String line;
while ((line = buf.readLine()) != null)
{
// System.err.println(line);
if (line.length() == 0)
{
// Got the "\r\n" line.
break;
}
}
// System.out.println("[Server-Out] " + serverResponse);
out = connection.getOutputStream();
out.write(serverResponse.getBytes());
out.flush();
}
finally
{
IO.close(buf);
IO.close(isr);
IO.close(in);
IO.close(out);
}
}
public void start()
{
// TODO Auto-generated method stub
}
}

View File

@ -0,0 +1,335 @@
/*******************************************************************************
* Copyright (c) 2011 Intalio, Inc.
* ======================================================================
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* and Apache License v2.0 which accompanies this distribution.
*
* The Eclipse Public License is available at
* http://www.eclipse.org/legal/epl-v10.html
*
* The Apache License v2.0 is available at
* http://www.opensource.org/licenses/apache2.0.php
*
* You may elect to redistribute this code under either of these licenses.
*******************************************************************************/
package org.eclipse.jetty.websocket.client.examples;
import java.net.InetSocketAddress;
import java.net.URI;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.websocket.api.OpCode;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
import org.eclipse.jetty.websocket.client.WebSocketClient;
import org.eclipse.jetty.websocket.client.WebSocketClientFactory;
/**
* This is not a general purpose websocket client. It's only for testing the websocket server and is hardwired to a specific draft version of the protocol.
*/
public class TestClient
{
public class TestSocket extends WebSocketAdapter
{
public void disconnect() throws Exception
{
super.getConnection().close();
}
@Override
public void onWebSocketBinary(byte[] payload, int offset, int len)
{
// TODO
}
@Override
public void onWebSocketClose(int statusCode, String reason)
{
// TODO Auto-generated method stub
super.onWebSocketClose(statusCode,reason);
}
@Override
public void onWebSocketConnect(WebSocketConnection connection)
{
if (_verbose)
{
System.err.printf("%s#onWebSocketConnect %s %s\n",this.getClass().getSimpleName(),connection,connection.getClass().getSimpleName());
}
}
public void send(OpCode op, byte[] data, int maxFragmentLength)
{
_starts.add(System.nanoTime());
int off = 0;
int len = data.length;
if ((maxFragmentLength > 0) && (len > maxFragmentLength))
{
len = maxFragmentLength;
}
__messagesSent++;
while (off < data.length)
{
__framesSent++;
byte flags = (byte)((off + len) == data.length?0x8:0);
// byte op = (byte)(off == 0?opcode:WebSocketConnectionRFC6455.OP_CONTINUATION);
if (_verbose)
{
// System.err.printf("%s#sendFrame %s|%s %s\n",
// this.getClass().getSimpleName(),
// TypeUtil.toHexString(flags),
// TypeUtil.toHexString(op),
// TypeUtil.toHexString(data,off,len));
}
// _connection.sendFrame(flags,op,data,off,len);
off += len;
if ((data.length - off) > len)
{
len = data.length - off;
}
if ((maxFragmentLength > 0) && (len > maxFragmentLength))
{
len = maxFragmentLength;
}
}
}
}
private static boolean _verbose = false;
private static final Random __random = new Random();
private final String _host;
private final int _port;
private final String _protocol;
private final int _timeout;
private static boolean __quiet;
private static int __framesSent;
private static int __messagesSent;
private static AtomicInteger __framesReceived = new AtomicInteger();
private static AtomicInteger __messagesReceived = new AtomicInteger();
private static AtomicLong __totalTime = new AtomicLong();
private static AtomicLong __minDuration = new AtomicLong(Long.MAX_VALUE);
private static AtomicLong __maxDuration = new AtomicLong(Long.MIN_VALUE);
private static long __start;
public static void main(String[] args) throws Exception
{
String host = "localhost";
int port = 8080;
String protocol = null;
int count = 10;
int size = 64;
int fragment = 4000;
boolean binary = false;
int clients = 1;
int delay = 1000;
for (int i = 0; i < args.length; i++)
{
String a = args[i];
if ("-p".equals(a) || "--port".equals(a))
{
port = Integer.parseInt(args[++i]);
}
else if ("-h".equals(a) || "--host".equals(a))
{
host = args[++i];
}
else if ("-c".equals(a) || "--count".equals(a))
{
count = Integer.parseInt(args[++i]);
}
else if ("-s".equals(a) || "--size".equals(a))
{
size = Integer.parseInt(args[++i]);
}
else if ("-f".equals(a) || "--fragment".equals(a))
{
fragment = Integer.parseInt(args[++i]);
}
else if ("-P".equals(a) || "--protocol".equals(a))
{
protocol = args[++i];
}
else if ("-v".equals(a) || "--verbose".equals(a))
{
_verbose = true;
}
else if ("-b".equals(a) || "--binary".equals(a))
{
binary = true;
}
else if ("-C".equals(a) || "--clients".equals(a))
{
clients = Integer.parseInt(args[++i]);
}
else if ("-d".equals(a) || "--delay".equals(a))
{
delay = Integer.parseInt(args[++i]);
}
else if ("-q".equals(a) || "--quiet".equals(a))
{
__quiet = true;
}
else if (a.startsWith("-"))
{
usage(args);
}
}
TestClient[] client = new TestClient[clients];
WebSocketClientFactory factory = new WebSocketClientFactory();
try
{
__start = System.currentTimeMillis();
protocol = protocol == null?"echo":protocol;
for (int i = 0; i < clients; i++)
{
client[i] = new TestClient(factory,host,port,protocol,60000);
client[i].open();
}
System.out.println("Jetty WebSocket PING " + host + ":" + port + " (" + new InetSocketAddress(host,port) + ") " + clients + " clients " + protocol);
for (int p = 0; p < count; p++)
{
long next = System.currentTimeMillis() + delay;
OpCode op = OpCode.TEXT;
if (binary)
{
op = OpCode.BINARY;
}
byte data[] = null;
switch (op)
{
case TEXT:
{
StringBuilder b = new StringBuilder();
while (b.length() < size)
{
b.append('A' + __random.nextInt(26));
}
data = b.toString().getBytes(StringUtil.__UTF8_CHARSET);
break;
}
case BINARY:
{
data = new byte[size];
__random.nextBytes(data);
break;
}
}
for (int i = 0; i < clients; i++)
{
client[i].send(op,data,fragment);
}
while (System.currentTimeMillis() < next)
{
Thread.sleep(10);
}
}
}
finally
{
for (int i = 0; i < clients; i++)
{
if (client[i] != null)
{
client[i].disconnect();
}
}
long duration = System.currentTimeMillis() - __start;
System.out.println("--- " + host + " websocket ping statistics using " + clients + " connection" + (clients > 1?"s":"") + " ---");
System.out.printf("%d/%d frames sent/recv, %d/%d mesg sent/recv, time %dms %dm/s %.2fbps%n",__framesSent,__framesReceived.get(),__messagesSent,
__messagesReceived.get(),duration,((1000L * __messagesReceived.get()) / duration),(1000.0D * __messagesReceived.get() * 8 * size)
/ duration / 1024 / 1024);
System.out.printf("rtt min/ave/max = %.3f/%.3f/%.3f ms\n",__minDuration.get() / 1000000.0,__messagesReceived.get() == 0?0.0:(__totalTime.get()
/ __messagesReceived.get() / 1000000.0),__maxDuration.get() / 1000000.0);
factory.stop();
}
}
private static void usage(String[] args)
{
System.err.println("ERROR: " + Arrays.asList(args));
System.err.println("USAGE: java -cp CLASSPATH " + TestClient.class + " [ OPTIONS ]");
System.err.println(" -h|--host HOST (default localhost)");
System.err.println(" -p|--port PORT (default 8080)");
System.err.println(" -b|--binary");
System.err.println(" -v|--verbose");
System.err.println(" -q|--quiet");
System.err.println(" -c|--count n (default 10)");
System.err.println(" -s|--size n (default 64)");
System.err.println(" -f|--fragment n (default 4000) ");
System.err.println(" -P|--protocol echo|echo-assemble|echo-fragment|echo-broadcast");
System.err.println(" -C|--clients n (default 1) ");
System.err.println(" -d|--delay n (default 1000ms) ");
System.exit(1);
}
private BlockingQueue<Long> _starts = new LinkedBlockingQueue<Long>();
int _messageBytes;
int _frames;
byte _opcode = -1;
private final CountDownLatch _handshook = new CountDownLatch(1);
private WebSocketClientFactory factory;
private TestSocket socket;
public TestClient(WebSocketClientFactory factory, String host, int port, String protocol, int timeoutMS) throws Exception
{
this.factory = factory;
_host = host;
_port = port;
_protocol = protocol;
_timeout = timeoutMS;
}
private void disconnect()
{
// TODO Auto-generated method stub
}
private void open() throws Exception
{
WebSocketClient client = factory.newWebSocketClient();
client.getPolicy().setMaxIdleTime(_timeout);
client.setProtocol(_protocol);
socket = new TestSocket();
URI wsUri = new URI("ws://" + _host + ":" + _port + "/");
client.connect(wsUri,socket).get(10,TimeUnit.SECONDS);
}
private void send(OpCode op, byte[] data, int fragment)
{
socket.send(op,data,fragment);
}
}