353073 factory pattern for clients

This commit is contained in:
Greg Wilkins 2011-08-26 11:50:33 +10:00
parent f9eafd34eb
commit c99498af91
17 changed files with 533 additions and 524 deletions

View File

@ -50,12 +50,25 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
private volatile int _capacity;
private Object[] _elements;
private int _head;
private int _tail;
private final ReentrantLock _headLock = new ReentrantLock();
private final Condition _notEmpty = _headLock.newCondition();
private int _head;
// spacers created to prevent false sharing between head and tail http://en.wikipedia.org/wiki/False_sharing
// TODO verify this has benefits
private long _space0;
private long _space1;
private long _space2;
private long _space3;
private long _space4;
private long _space5;
private long _space6;
private long _space7;
private final ReentrantLock _tailLock = new ReentrantLock();
private int _tail;
/* ------------------------------------------------------------ */
/** Create a growing partially blocking Queue
@ -675,4 +688,12 @@ public class BlockingArrayQueue<E> extends AbstractList<E> implements BlockingQu
_tailLock.unlock();
}
}
/* ------------------------------------------------------------ */
long sumOfSpace()
{
// this method exists to stop clever optimisers removing the spacers
return _space0++ +_space1++ +_space2++ +_space3++ +_space4++ +_space5++ +_space6++ +_space7++;
}
}

View File

@ -0,0 +1,24 @@
package org.eclipse.jetty.websocket;
public class FixedMaskGen implements MaskGen
{
final byte[] _mask;
public FixedMaskGen()
{
_mask=new byte[]{(byte)0xff,(byte)0xff,(byte)0xff,(byte)0xff};
}
public FixedMaskGen(byte[] mask)
{
_mask=mask;
}
public void genMask(byte[] mask)
{
mask[0]=_mask[0];
mask[1]=_mask[1];
mask[2]=_mask[2];
mask[3]=_mask[3];
}
}

View File

@ -0,0 +1,6 @@
package org.eclipse.jetty.websocket;
public interface MaskGen
{
void genMask(byte[] mask);
}

View File

@ -0,0 +1,23 @@
package org.eclipse.jetty.websocket;
import java.util.Random;
public class RandomMaskGen implements MaskGen
{
final Random _random;
public RandomMaskGen()
{
_random=new Random();
}
public RandomMaskGen(Random random)
{
_random=random;
}
public void genMask(byte[] mask)
{
_random.nextBytes(mask);
}
}

View File

@ -22,7 +22,7 @@ import org.eclipse.jetty.util.TypeUtil;
*/
public class TestClient implements WebSocket.OnFrame
{
private static WebSocketClient __client = new WebSocketClient();
private static WebSocketClientFactory __clientFactory = new WebSocketClientFactory();
private static boolean _verbose=false;
private static final Random __random = new Random();
@ -119,7 +119,7 @@ public class TestClient implements WebSocket.OnFrame
private void open() throws Exception
{
WebSocketClient client = new WebSocketClient(__client);
WebSocketClient client = new WebSocketClient(__clientFactory);
client.setProtocol(_protocol);
client.setMaxIdleTime(_timeout);
client.open(new URI("ws://"+_host+":"+_port+"/"),this).get(10,TimeUnit.SECONDS);
@ -179,7 +179,7 @@ public class TestClient implements WebSocket.OnFrame
public static void main(String[] args) throws Exception
{
__client.start();
__clientFactory.start();
String host="localhost";
int port=8080;
@ -277,7 +277,7 @@ public class TestClient implements WebSocket.OnFrame
"time "+duration+"ms "+ (1000L*__messagesReceived.get()/duration)+" req/s");
System.out.printf("rtt min/ave/max = %.3f/%.3f/%.3f ms\n",__minDuration.get()/1000000.0,__messagesReceived.get()==0?0.0:(__totalTime.get()/__messagesReceived.get()/1000000.0),__maxDuration.get()/1000000.0);
__client.stop();
__clientFactory.stop();
}
}

View File

@ -1,12 +1,10 @@
package org.eclipse.jetty.websocket;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ProtocolException;
import java.net.URI;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Map;
@ -19,24 +17,8 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.SimpleBuffers;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.websocket.WebSocketGeneratorD12.MaskGen;
/* ------------------------------------------------------------ */
@ -44,8 +26,9 @@ import org.eclipse.jetty.websocket.WebSocketGeneratorD12.MaskGen;
* <p>This WebSocket Client class can create multiple websocket connections to multiple destinations.
* It uses the same {@link WebSocket} endpoint API as the server.
* Simple usage is as follows: <pre>
* WebSocketClient client = new WebSocketClient();
* client.setMaxIdleTime(500);
* WebSocketClientFactory factory = new WebSocketClientFactory();
* factory.start();
* WebSocketClient client = factory.newClient();
* client.start();
*
* WebSocket.Connection connection = client.open(new URI("ws://127.0.0.1:8080/"),new WebSocket.OnTextMessage()
@ -69,84 +52,38 @@ import org.eclipse.jetty.websocket.WebSocketGeneratorD12.MaskGen;
* connection.sendMessage("Hello World");
* </pre>
*/
public class WebSocketClient extends AggregateLifeCycle
public class WebSocketClient
{
private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClient.class.getName());
private final static Random __random = new Random();
private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
private final WebSocketClient _root;
private final WebSocketClient _parent;
private final ThreadPool _threadPool;
private final WebSocketClientSelector _selector;
private final WebSocketClientFactory _factory;
private final Map<String,String> _cookies=new ConcurrentHashMap<String, String>();
private final List<String> _extensions=new CopyOnWriteArrayList<String>();
private int _bufferSize=64*1024;
private String _origin;
private String _protocol;
private int _maxIdleTime=-1;
private WebSocketBuffers _buffers;
private boolean _maskingEnabled = true;
private MaskGen _maskGen;
/* ------------------------------------------------------------ */
/** Create a WebSocket Client with default configuration.
/** Create a WebSocket Client with private factory.
* <p>Creates a WebSocketClient from a private WebSocketClientFactory. This can be wasteful of resources if many clients are created.
*/
public WebSocketClient()
public WebSocketClient() throws Exception
{
this(new QueuedThreadPool());
_factory=new WebSocketClientFactory();
_factory.start();
_maskGen=_factory.getMaskGen();
}
/* ------------------------------------------------------------ */
/** Create a WebSocket Client with shared threadpool.
/** Create a WebSocket Client with shared factory.
* @param threadpool
*/
public WebSocketClient(ThreadPool threadpool)
public WebSocketClient(WebSocketClientFactory factory)
{
_root=this;
_parent=null;
_threadPool=threadpool;
_selector=new WebSocketClientSelector();
addBean(_selector);
addBean(_threadPool);
}
/* ------------------------------------------------------------ */
/** Create a WebSocket Client from another.
* <p>If multiple clients are required so that connections created may have different
* configurations, then it is more efficient to create a client based on another, so
* that the thread pool and IO infrastructure may be shared.
*/
public WebSocketClient(WebSocketClient parent)
{
_root=parent._root;
_parent=parent;
_threadPool=parent._threadPool;
_selector=parent._selector;
_parent.addBean(this);
}
/* ------------------------------------------------------------ */
/**
* Get the selectorManager. Used to configure the manager.
* @return The {@link SelectorManager} instance.
*/
public SelectorManager getSelectorManager()
{
return _selector;
}
/* ------------------------------------------------------------ */
/** Get the ThreadPool.
* <p>Used to set/query the thread pool configuration.
* @return The {@link ThreadPool}
*/
public ThreadPool getThreadPool()
{
return _threadPool;
_factory=factory;
_maskGen=_factory.getMaskGen();
}
/* ------------------------------------------------------------ */
@ -167,26 +104,6 @@ public class WebSocketClient extends AggregateLifeCycle
_maxIdleTime=maxIdleTime;
}
/* ------------------------------------------------------------ */
/** Get the WebSocket Buffer size for connections opened by this client.
* @return the buffer size in bytes.
*/
public int getBufferSize()
{
return _bufferSize;
}
/* ------------------------------------------------------------ */
/** Set the WebSocket Buffer size for connections opened by this client.
* @param bufferSize the buffer size in bytes.
*/
public void setBufferSize(int bufferSize)
{
if (isRunning())
throw new IllegalStateException(getState());
_bufferSize = bufferSize;
}
/* ------------------------------------------------------------ */
/** Get the subprotocol string for connections opened by this client.
* @return The subprotocol
@ -235,23 +152,16 @@ public class WebSocketClient extends AggregateLifeCycle
return _extensions;
}
/* ------------------------------------------------------------ */
/**
* @return whether masking is enabled.
*/
public boolean isMaskingEnabled()
public MaskGen getMaskGen()
{
return _maskingEnabled;
return _maskGen;
}
/* ------------------------------------------------------------ */
/**
* @param maskingEnabled whether to enable masking
*/
public void setMaskingEnabled(boolean maskingEnabled)
public void setMaskGen(MaskGen maskGen)
{
_maskingEnabled=maskingEnabled;
_maskGen = maskGen;
}
/* ------------------------------------------------------------ */
@ -297,8 +207,8 @@ public class WebSocketClient extends AggregateLifeCycle
*/
public Future<WebSocket.Connection> open(URI uri, WebSocket websocket) throws IOException
{
if (!isStarted())
throw new IllegalStateException("!started");
if (!_factory.isStarted())
throw new IllegalStateException("Factory !started");
String scheme=uri.getScheme();
if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
throw new IllegalArgumentException("Bad WebSocket scheme '"+scheme+"'");
@ -309,293 +219,32 @@ public class WebSocketClient extends AggregateLifeCycle
channel.socket().setTcpNoDelay(true);
int maxIdleTime = getMaxIdleTime();
if (maxIdleTime<0)
maxIdleTime=(int)_selector.getMaxIdleTime();
maxIdleTime=(int)_factory.getSelectorManager().getMaxIdleTime();
if (maxIdleTime>0)
channel.socket().setSoTimeout(maxIdleTime);
InetSocketAddress address=new InetSocketAddress(uri.getHost(),uri.getPort());
final WebSocketFuture holder=new WebSocketFuture(websocket,uri,_protocol,maxIdleTime,_cookies,_extensions,channel);
final WebSocketFuture holder=new WebSocketFuture(websocket,uri,_protocol,_origin,_maskGen,maxIdleTime,_cookies,_extensions,channel);
channel.configureBlocking(false);
channel.connect(address);
_selector.register( channel, holder);
_factory.getSelectorManager().register( channel, holder);
return holder;
}
@Override
protected void doStart() throws Exception
{
if (_parent!=null && !_parent.isRunning())
throw new IllegalStateException("parent:"+getState());
_buffers = new WebSocketBuffers(_bufferSize);
super.doStart();
// Start a selector and timer if this is the root client
if (_parent==null)
{
for (int i=0;i<_selector.getSelectSets();i++)
{
final int id=i;
_threadPool.dispatch(new Runnable()
{
public void run()
{
while(isRunning())
{
try
{
_selector.doSelect(id);
}
catch (IOException e)
{
__log.warn(e);
}
}
}
});
}
}
}
/* ------------------------------------------------------------ */
/** WebSocket Client Selector Manager
*/
class WebSocketClientSelector extends SelectorManager
{
@Override
public boolean dispatch(Runnable task)
{
return _threadPool.dispatch(task);
}
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey sKey) throws IOException
{
return new SelectChannelEndPoint(channel,selectSet,sKey);
}
@Override
protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
{
WebSocketFuture holder = (WebSocketFuture) endpoint.getSelectionKey().attachment();
return new HandshakeConnection(endpoint,holder);
}
@Override
protected void endPointOpened(SelectChannelEndPoint endpoint)
{
// TODO expose on outer class
}
@Override
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
{
throw new IllegalStateException();
}
@Override
protected void endPointClosed(SelectChannelEndPoint endpoint)
{
endpoint.getConnection().closed();
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
{
if (!(attachment instanceof WebSocketFuture))
super.connectionFailed(channel,ex,attachment);
else
{
__log.debug(ex);
WebSocketFuture holder = (WebSocketFuture)attachment;
holder.handshakeFailed(ex);
}
}
}
/* ------------------------------------------------------------ */
/** Handshake Connection.
* Handles the connection until the handshake succeeds or fails.
*/
class HandshakeConnection extends AbstractConnection
{
private final SelectChannelEndPoint _endp;
private final WebSocketFuture _holder;
private final String _key;
private final HttpParser _parser;
private String _accept;
private String _error;
public HandshakeConnection(SelectChannelEndPoint endpoint, WebSocketFuture holder)
{
super(endpoint,System.currentTimeMillis());
_endp=endpoint;
_holder=holder;
byte[] bytes=new byte[16];
__random.nextBytes(bytes);
_key=new String(B64Code.encode(bytes));
Buffers buffers = new SimpleBuffers(_buffers.getBuffer(),null);
_parser=new HttpParser(buffers,_endp,
new HttpParser.EventHandler()
{
@Override
public void startResponse(Buffer version, int status, Buffer reason) throws IOException
{
if (status!=101)
{
_error="Bad response status "+status+" "+reason;
_endp.close();
}
}
@Override
public void parsedHeader(Buffer name, Buffer value) throws IOException
{
if (__ACCEPT.equals(name))
_accept=value.toString();
}
@Override
public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
{
if (_error==null)
_error="Bad response: "+method+" "+url+" "+version;
_endp.close();
}
@Override
public void content(Buffer ref) throws IOException
{
if (_error==null)
_error="Bad response. "+ref.length()+"B of content?";
_endp.close();
}
});
String path=_holder.getURI().getPath();
if (path==null || path.length()==0)
path="/";
String request=
"GET "+path+" HTTP/1.1\r\n"+
"Host: "+holder.getURI().getHost()+":"+_holder.getURI().getPort()+"\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: "+_key+"\r\n"+
(_origin==null?"":"Origin: "+_origin+"\r\n")+
"Sec-WebSocket-Version: "+WebSocketConnectionD12.VERSION+"\r\n";
if (holder.getProtocol()!=null)
request+="Sec-WebSocket-Protocol: "+holder.getProtocol()+"\r\n";
if (holder.getCookies()!=null && holder.getCookies().size()>0)
{
for (String cookie : holder.getCookies().keySet())
request+="Cookie: "+QuotedStringTokenizer.quoteIfNeeded(cookie,HttpFields.__COOKIE_DELIM)+
"="+
QuotedStringTokenizer.quoteIfNeeded(holder.getCookies().get(cookie),HttpFields.__COOKIE_DELIM)+
"\r\n";
}
request+="\r\n";
// TODO extensions
try
{
Buffer handshake = new ByteArrayBuffer(request,false);
int len=handshake.length();
if (len!=_endp.flush(handshake))
throw new IOException("incomplete");
}
catch(IOException e)
{
holder.handshakeFailed(e);
}
}
public Connection handle() throws IOException
{
while (_endp.isOpen() && !_parser.isComplete())
{
switch (_parser.parseAvailable())
{
case -1:
_holder.handshakeFailed(new IOException("Incomplete handshake response"));
return this;
case 0:
return this;
default:
break;
}
}
if (_error==null)
{
if (_accept==null)
_error="No Sec-WebSocket-Accept";
else if (!WebSocketConnectionD12.hashKey(_key).equals(_accept))
_error="Bad Sec-WebSocket-Accept";
else
{
Buffer header=_parser.getHeaderBuffer();
MaskGen maskGen=_maskingEnabled?new WebSocketGeneratorD12.RandomMaskGen():new WebSocketGeneratorD12.NullMaskGen();
WebSocketConnectionD12 connection = new WebSocketConnectionD12(_holder.getWebSocket(),_endp,_buffers,System.currentTimeMillis(),_holder.getMaxIdleTime(),_holder.getProtocol(),null,10,maskGen);
if (header.hasContent())
connection.fillBuffersFrom(header);
_buffers.returnBuffer(header);
_holder.onConnection(connection);
return connection;
}
}
_endp.close();
return this;
}
public boolean isIdle()
{
return false;
}
public boolean isSuspended()
{
return false;
}
public void closed()
{
if (_error!=null)
_holder.handshakeFailed(new ProtocolException(_error));
else
_holder.handshakeFailed(new EOFException());
}
}
/* ------------------------------------------------------------ */
/** The Future Websocket Connection.
*/
class WebSocketFuture implements Future<WebSocket.Connection>
static class WebSocketFuture implements Future<WebSocket.Connection>
{
final WebSocket _websocket;;
final URI _uri;
final String _protocol;
final String _origin;
final MaskGen _maskGen;
final int _maxIdleTime;
final Map<String,String> _cookies;
final List<String> _extensions;
@ -605,11 +254,13 @@ public class WebSocketClient extends AggregateLifeCycle
WebSocketConnection _connection;
Throwable _exception;
public WebSocketFuture(WebSocket websocket, URI uri, String protocol, int maxIdleTime, Map<String,String> cookies,List<String> extensions, ByteChannel channel)
private WebSocketFuture(WebSocket websocket, URI uri, String protocol, String origin, MaskGen maskGen, int maxIdleTime, Map<String,String> cookies,List<String> extensions, ByteChannel channel)
{
_websocket=websocket;
_uri=uri;
_protocol=protocol;
_origin=origin;
_maskGen=maskGen;
_maxIdleTime=maxIdleTime;
_cookies=cookies;
_extensions=extensions;
@ -695,6 +346,16 @@ public class WebSocketClient extends AggregateLifeCycle
return _maxIdleTime;
}
public String getOrigin()
{
return _origin;
}
public MaskGen getMaskGen()
{
return _maskGen;
}
public String toString()
{
return "[" + _uri + ","+_websocket+"]@"+hashCode();
@ -804,6 +465,6 @@ public class WebSocketClient extends AggregateLifeCycle
__log.debug(e);
}
}
}
}
}

View File

@ -0,0 +1,380 @@
package org.eclipse.jetty.websocket;
import java.io.EOFException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ProtocolException;
import java.net.URI;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.SimpleBuffers;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
/* ------------------------------------------------------------ */
/** WebSocket Client Factory.
* The WebSocketClientFactory contains the common mechanisms for multiple WebSocketClient instances (eg threadpool, NIO selector).
* WebSocketClients with different configurations should share the same factory to avoid wasted resources.
* @see WebSocketClient
*/
public class WebSocketClientFactory extends AggregateLifeCycle
{
private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClientFactory.class.getName());
private final static Random __random = new Random();
private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
private final ThreadPool _threadPool;
private final WebSocketClientSelector _selector;
private final WebSocketBuffers _buffers;
private final MaskGen _maskGen;
/* ------------------------------------------------------------ */
/** Create a WebSocket Client with default configuration.
*/
public WebSocketClientFactory()
{
this(new QueuedThreadPool(),new RandomMaskGen(),16*1024);
}
/* ------------------------------------------------------------ */
/** Create a WebSocket Client with ThreadPool .
*/
public WebSocketClientFactory(ThreadPool threadPool)
{
this(threadPool,new RandomMaskGen(),16*1024);
}
/* ------------------------------------------------------------ */
/** Create a WebSocket Client with shared threadpool.
* @param threadpool
*/
public WebSocketClientFactory(ThreadPool threadpool,MaskGen maskGen,int bufferSize)
{
_threadPool=threadpool;
_selector=new WebSocketClientSelector();
_buffers=new WebSocketBuffers(bufferSize);
_maskGen=maskGen;
addBean(_selector);
addBean(_threadPool);
}
/* ------------------------------------------------------------ */
/**
* Get the selectorManager. Used to configure the manager.
* @return The {@link SelectorManager} instance.
*/
public SelectorManager getSelectorManager()
{
return _selector;
}
/* ------------------------------------------------------------ */
/** Get the ThreadPool.
* <p>Used to set/query the thread pool configuration.
* @return The {@link ThreadPool}
*/
public ThreadPool getThreadPool()
{
return _threadPool;
}
/* ------------------------------------------------------------ */
public MaskGen getMaskGen()
{
return _maskGen;
}
/* ------------------------------------------------------------ */
public WebSocketClient newWebSocketClient()
{
return new WebSocketClient(this);
}
/* ------------------------------------------------------------ */
@Override
protected void doStart() throws Exception
{
super.doStart();
// Start a selector threads
for (int i=0;i<_selector.getSelectSets();i++)
{
final int id=i;
_threadPool.dispatch(new Runnable()
{
public void run()
{
while(isRunning())
{
try
{
_selector.doSelect(id);
}
catch (IOException e)
{
__log.warn(e);
}
}
}
});
}
}
/* ------------------------------------------------------------ */
/** WebSocket Client Selector Manager
*/
class WebSocketClientSelector extends SelectorManager
{
@Override
public boolean dispatch(Runnable task)
{
return _threadPool.dispatch(task);
}
@Override
protected SelectChannelEndPoint newEndPoint(SocketChannel channel, SelectSet selectSet, final SelectionKey sKey) throws IOException
{
return new SelectChannelEndPoint(channel,selectSet,sKey);
}
@Override
protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
{
WebSocketClient.WebSocketFuture holder = (WebSocketClient.WebSocketFuture) endpoint.getSelectionKey().attachment();
return new HandshakeConnection(endpoint,holder);
}
@Override
protected void endPointOpened(SelectChannelEndPoint endpoint)
{
// TODO expose on outer class ??
}
@Override
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
{
throw new IllegalStateException();
}
@Override
protected void endPointClosed(SelectChannelEndPoint endpoint)
{
endpoint.getConnection().closed();
}
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
{
if (!(attachment instanceof WebSocketClient.WebSocketFuture))
super.connectionFailed(channel,ex,attachment);
else
{
__log.debug(ex);
WebSocketClient.WebSocketFuture future = (WebSocketClient.WebSocketFuture)attachment;
future.handshakeFailed(ex);
}
}
}
/* ------------------------------------------------------------ */
/** Handshake Connection.
* Handles the connection until the handshake succeeds or fails.
*/
class HandshakeConnection extends AbstractConnection
{
private final SelectChannelEndPoint _endp;
private final WebSocketClient.WebSocketFuture _holder;
private final String _key;
private final HttpParser _parser;
private String _accept;
private String _error;
public HandshakeConnection(SelectChannelEndPoint endpoint, WebSocketClient.WebSocketFuture future)
{
super(endpoint,System.currentTimeMillis());
_endp=endpoint;
_holder=future;
byte[] bytes=new byte[16];
__random.nextBytes(bytes);
_key=new String(B64Code.encode(bytes));
Buffers buffers = new SimpleBuffers(_buffers.getBuffer(),null);
_parser=new HttpParser(buffers,_endp,
new HttpParser.EventHandler()
{
@Override
public void startResponse(Buffer version, int status, Buffer reason) throws IOException
{
if (status!=101)
{
_error="Bad response status "+status+" "+reason;
_endp.close();
}
}
@Override
public void parsedHeader(Buffer name, Buffer value) throws IOException
{
if (__ACCEPT.equals(name))
_accept=value.toString();
}
@Override
public void startRequest(Buffer method, Buffer url, Buffer version) throws IOException
{
if (_error==null)
_error="Bad response: "+method+" "+url+" "+version;
_endp.close();
}
@Override
public void content(Buffer ref) throws IOException
{
if (_error==null)
_error="Bad response. "+ref.length()+"B of content?";
_endp.close();
}
});
String path=_holder.getURI().getPath();
if (path==null || path.length()==0)
path="/";
String origin = future.getOrigin();
String request=
"GET "+path+" HTTP/1.1\r\n"+
"Host: "+future.getURI().getHost()+":"+_holder.getURI().getPort()+"\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: "+_key+"\r\n"+
(origin==null?"":"Origin: "+origin+"\r\n")+
"Sec-WebSocket-Version: "+WebSocketConnectionD12.VERSION+"\r\n";
if (future.getProtocol()!=null)
request+="Sec-WebSocket-Protocol: "+future.getProtocol()+"\r\n";
if (future.getCookies()!=null && future.getCookies().size()>0)
{
for (String cookie : future.getCookies().keySet())
request+="Cookie: "+QuotedStringTokenizer.quoteIfNeeded(cookie,HttpFields.__COOKIE_DELIM)+
"="+
QuotedStringTokenizer.quoteIfNeeded(future.getCookies().get(cookie),HttpFields.__COOKIE_DELIM)+
"\r\n";
}
request+="\r\n";
// TODO extensions
try
{
Buffer handshake = new ByteArrayBuffer(request,false);
int len=handshake.length();
if (len!=_endp.flush(handshake))
throw new IOException("incomplete");
}
catch(IOException e)
{
future.handshakeFailed(e);
}
}
public Connection handle() throws IOException
{
while (_endp.isOpen() && !_parser.isComplete())
{
switch (_parser.parseAvailable())
{
case -1:
_holder.handshakeFailed(new IOException("Incomplete handshake response"));
return this;
case 0:
return this;
default:
break;
}
}
if (_error==null)
{
if (_accept==null)
_error="No Sec-WebSocket-Accept";
else if (!WebSocketConnectionD12.hashKey(_key).equals(_accept))
_error="Bad Sec-WebSocket-Accept";
else
{
Buffer header=_parser.getHeaderBuffer();
MaskGen maskGen=_holder.getMaskGen();
WebSocketConnectionD12 connection = new WebSocketConnectionD12(_holder.getWebSocket(),_endp,_buffers,System.currentTimeMillis(),_holder.getMaxIdleTime(),_holder.getProtocol(),null,10,maskGen);
if (header.hasContent())
connection.fillBuffersFrom(header);
_buffers.returnBuffer(header);
_holder.onConnection(connection);
return connection;
}
}
_endp.close();
return this;
}
public boolean isIdle()
{
return false;
}
public boolean isSuspended()
{
return false;
}
public void closed()
{
if (_error!=null)
_holder.handshakeFailed(new ProtocolException(_error));
else
_holder.handshakeFailed(new EOFException());
}
}
}

View File

@ -37,7 +37,6 @@ import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
import org.eclipse.jetty.websocket.WebSocket.OnControl;
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
import org.eclipse.jetty.websocket.WebSocketGeneratorD12.MaskGen;
public class WebSocketConnectionD12 extends AbstractConnection implements WebSocketConnection
{

View File

@ -38,61 +38,6 @@ public class WebSocketGeneratorD06 implements WebSocketGenerator
private int _m;
private boolean _opsent;
private final MaskGen _maskGen;
public interface MaskGen
{
void genMask(byte[] mask);
}
public static class NullMaskGen implements MaskGen
{
public void genMask(byte[] mask)
{
mask[0]=mask[1]=mask[2]=mask[3]=0;
}
}
public static class FixedMaskGen implements MaskGen
{
final byte[] _mask;
public FixedMaskGen()
{
_mask=new byte[]{(byte)0xff,(byte)0xff,(byte)0xff,(byte)0xff};
}
public FixedMaskGen(byte[] mask)
{
_mask=mask;
}
public void genMask(byte[] mask)
{
mask[0]=_mask[0];
mask[1]=_mask[1];
mask[2]=_mask[2];
mask[3]=_mask[3];
}
}
public static class RandomMaskGen implements MaskGen
{
final Random _random;
public RandomMaskGen()
{
_random=new SecureRandom();
}
public RandomMaskGen(Random random)
{
_random=random;
}
public void genMask(byte[] mask)
{
_random.nextBytes(mask);
}
}
public WebSocketGeneratorD06(WebSocketBuffers buffers, EndPoint endp)
{

View File

@ -14,7 +14,6 @@
package org.eclipse.jetty.websocket;
import java.io.IOException;
import java.util.Random;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.EndPoint;
@ -38,61 +37,6 @@ public class WebSocketGeneratorD12 implements WebSocketGenerator
private boolean _opsent;
private final MaskGen _maskGen;
public interface MaskGen
{
void genMask(byte[] mask);
}
public static class NullMaskGen implements MaskGen
{
public void genMask(byte[] mask)
{
mask[0]=mask[1]=mask[2]=mask[3]=0;
}
}
public static class FixedMaskGen implements MaskGen
{
final byte[] _mask;
public FixedMaskGen()
{
_mask=new byte[]{(byte)0xff,(byte)0xff,(byte)0xff,(byte)0xff};
}
public FixedMaskGen(byte[] mask)
{
_mask=mask;
}
public void genMask(byte[] mask)
{
mask[0]=_mask[0];
mask[1]=_mask[1];
mask[2]=_mask[2];
mask[3]=_mask[3];
}
}
public static class RandomMaskGen implements MaskGen
{
final Random _random;
public RandomMaskGen()
{
_random=new Random();
}
public RandomMaskGen(Random random)
{
_random=random;
}
public void genMask(byte[] mask)
{
_random.nextBytes(mask);
}
}
public WebSocketGeneratorD12(WebSocketBuffers buffers, EndPoint endp)
{
_buffers=buffers;

View File

@ -0,0 +1,10 @@
package org.eclipse.jetty.websocket;
public class ZeroMaskGen implements MaskGen
{
public void genMask(byte[] mask)
{
mask[0]=mask[1]=mask[2]=mask[3]=0;
}
}

View File

@ -27,28 +27,32 @@ import org.junit.Test;
public class WebSocketClientTest
{
private WebSocketClientFactory _factory = new WebSocketClientFactory();
private ServerSocket _server;
private int _serverPort;
@Before
public void startServer() throws IOException {
public void startServer() throws Exception
{
_server = new ServerSocket();
_server.bind(null);
_serverPort = _server.getLocalPort();
_factory.start();
}
@After
public void stopServer() throws IOException {
public void stopServer() throws Exception
{
if(_server != null) {
_server.close();
}
_factory.stop();
}
@Test
public void testBadURL() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
WebSocketClient client = new WebSocketClient(_factory);
boolean bad=false;
final AtomicBoolean open = new AtomicBoolean();
@ -79,8 +83,7 @@ public class WebSocketClientTest
@Test
public void testAsyncConnectionRefused() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
WebSocketClient client = new WebSocketClient(_factory);
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
@ -120,8 +123,7 @@ public class WebSocketClientTest
@Test
public void testConnectionNotAccepted() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
WebSocketClient client = new WebSocketClient(_factory);
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
@ -159,8 +161,7 @@ public class WebSocketClientTest
@Test
public void testConnectionTimeout() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
WebSocketClient client = new WebSocketClient(_factory);
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
@ -200,8 +201,7 @@ public class WebSocketClientTest
@Test
public void testBadHandshake() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
WebSocketClient client = new WebSocketClient(_factory);
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
@ -242,8 +242,7 @@ public class WebSocketClientTest
@Test
public void testBadUpgrade() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
WebSocketClient client = new WebSocketClient(_factory);
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
@ -285,8 +284,7 @@ public class WebSocketClientTest
@Test
public void testUpgradeThenTCPClose() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
WebSocketClient client = new WebSocketClient(_factory);
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
@ -323,9 +321,8 @@ public class WebSocketClientTest
@Test
public void testIdle() throws Exception
{
WebSocketClient client = new WebSocketClient();
WebSocketClient client = new WebSocketClient(_factory);
client.setMaxIdleTime(500);
client.start();
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
@ -362,9 +359,8 @@ public class WebSocketClientTest
@Test
public void testNotIdle() throws Exception
{
WebSocketClient client = new WebSocketClient();
WebSocketClient client = new WebSocketClient(_factory);
client.setMaxIdleTime(500);
client.start();
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();

View File

@ -20,7 +20,7 @@ public class WebSocketGeneratorD06Test
byte[] _mask = new byte[4];
int _m;
public WebSocketGeneratorD06.MaskGen _maskGen = new WebSocketGeneratorD06.FixedMaskGen(
public MaskGen _maskGen = new FixedMaskGen(
new byte[]{(byte)0x00,(byte)0x00,(byte)0x0f,(byte)0xff});
@Before

View File

@ -20,7 +20,7 @@ public class WebSocketGeneratorD12Test
byte[] _mask = new byte[4];
int _m;
public WebSocketGeneratorD12.MaskGen _maskGen = new WebSocketGeneratorD12.FixedMaskGen(
public MaskGen _maskGen = new FixedMaskGen(
new byte[]{(byte)0x00,(byte)0x00,(byte)0x0f,(byte)0xff});
@Before

View File

@ -167,7 +167,7 @@ public class WebSocketLoadD12Test
this.iterations = iterations;
_endp=new SocketEndPoint(socket);
_generator = new WebSocketGeneratorD12(new WebSocketBuffers(32*1024),_endp,new WebSocketGeneratorD12.FixedMaskGen());
_generator = new WebSocketGeneratorD12(new WebSocketBuffers(32*1024),_endp,new FixedMaskGen());
_parser = new WebSocketParserD12(new WebSocketBuffers(32*1024),_endp,_handler,false);
}

View File

@ -662,7 +662,7 @@ public class WebSocketMessageD06Test
final AtomicReference<String> received = new AtomicReference<String>();
ByteArrayEndPoint endp = new ByteArrayEndPoint(new byte[0],4096);
WebSocketGeneratorD06.MaskGen maskGen = new WebSocketGeneratorD06.RandomMaskGen();
MaskGen maskGen = new RandomMaskGen();
WebSocketGeneratorD06 gen = new WebSocketGeneratorD06(new WebSocketBuffers(8096),endp,maskGen);
byte[] data = message.getBytes(StringUtil.__UTF8);

View File

@ -861,7 +861,7 @@ public class WebSocketMessageD12Test
final AtomicReference<String> received = new AtomicReference<String>();
ByteArrayEndPoint endp = new ByteArrayEndPoint(new byte[0],4096);
WebSocketGeneratorD12.MaskGen maskGen = new WebSocketGeneratorD12.RandomMaskGen();
MaskGen maskGen = new RandomMaskGen();
WebSocketGeneratorD12 gen = new WebSocketGeneratorD12(new WebSocketBuffers(8096),endp,maskGen);
byte[] data = message.getBytes(StringUtil.__UTF8);