353073 Improved client API to use futures

This commit is contained in:
Greg Wilkins 2011-08-11 11:50:23 +10:00
parent 5ea2806c08
commit 6b9ecd0806
15 changed files with 601 additions and 685 deletions

View File

@ -103,14 +103,6 @@ public class WebSocketUpgradeTest
_results.add("clientWS.onMessage");
_results.add(data);
}
/* ------------------------------------------------------------ */
public void onError(String message, Throwable ex)
{
_results.add("clientWS.onError");
_results.add(message);
_results.add(ex);
}
};
@ -252,19 +244,11 @@ public class WebSocketUpgradeTest
_results.add("serverWS.onMessage");
_results.add(data);
}
/* ------------------------------------------------------------ */
public void onError(String message, Throwable ex)
{
_results.add("serverWS.onError");
_results.add(message);
_results.add(ex);
}
/* ------------------------------------------------------------ */
public void onClose(int code, String message)
{
_results.add("onDisconnect");
_results.add("onClose");
_webSockets.remove(this);
}

View File

@ -76,6 +76,19 @@ public class ByteArrayBuffer extends AbstractBuffer
_access=IMMUTABLE;
_string = value;
}
public ByteArrayBuffer(String value,boolean immutable)
{
super(READWRITE,NON_VOLATILE);
_bytes = StringUtil.getBytes(value);
setGetIndex(0);
setPutIndex(_bytes.length);
if (immutable)
{
_access=IMMUTABLE;
_string = value;
}
}
public ByteArrayBuffer(String value,String encoding) throws UnsupportedEncodingException
{

View File

@ -68,14 +68,6 @@ public class TestClient implements WebSocket.OnFrame
{
}
public void onError(String message, Throwable ex)
{
System.err.println("onError: "+message);
if (ex!=null)
ex.printStackTrace();
_handshook.countDown();
}
public void onClose(int closeCode, String message)
{
_handshook.countDown();
@ -141,8 +133,10 @@ public class TestClient implements WebSocket.OnFrame
private void open() throws Exception
{
__client.open(new URI("ws://"+_host+":"+_port+"/"),this,_protocol,_timeout);
_handshook.await(10,TimeUnit.SECONDS);
WebSocketClient client = new WebSocketClient(__client);
client.setProtocol(_protocol);
client.setMaxIdleTime(_timeout);
client.open(new URI("ws://"+_host+":"+_port+"/"),this).get(10,TimeUnit.SECONDS);
}
public void ping(byte opcode,byte[] data,int fragment) throws Exception

View File

@ -107,12 +107,6 @@ public class TestServer extends Server
System.err.printf("%s#onOpen %s\n",this.getClass().getSimpleName(),connection);
}
public void onError(String message, Throwable ex)
{
if (_verbose)
System.err.printf("%s#onOpen %s\n",this.getClass().getSimpleName(),message);
}
public void onHandshake(FrameConnection connection)
{
if (_verbose)

View File

@ -29,13 +29,6 @@ public interface WebSocket
*/
void onOpen(Connection connection);
/**
* Called when a new websocket connection cannot be created
* @param message The error message
* @param ex The exception or null
*/
void onError(String message, Throwable ex);
/**
* Called when an established websocket connection closes
* @param closeCode

View File

@ -7,10 +7,16 @@ import java.net.URI;
import java.nio.channels.ByteChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import java.nio.channels.UnsupportedAddressTypeException;
import java.util.List;
import java.util.Map;
import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.eclipse.jetty.http.HttpFields;
import org.eclipse.jetty.http.HttpParser;
@ -21,7 +27,6 @@ import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.SimpleBuffers;
import org.eclipse.jetty.io.View;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager;
import org.eclipse.jetty.util.B64Code;
@ -33,32 +38,54 @@ import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.util.thread.Timeout;
public class WebSocketClient extends AggregateLifeCycle
{
{
private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClient.class.getCanonicalName());
private final static Random __random = new Random();
private final static ByteArrayBuffer __ACCEPT = new ByteArrayBuffer.CaseInsensitive("Sec-WebSocket-Accept");
private final WebSocketClient _root;
private final WebSocketClient _parent;
private final ThreadPool _threadPool;
private final Selector _selector=new Selector();
private final Timeout _connectQ=new Timeout();
private int _connectTimeout=30000;
private final Selector _selector;
private final Timeout _connectQ;
private final Map<String,String> _cookies=new ConcurrentHashMap<String, String>();
private final List<String> _extensions=new CopyOnWriteArrayList<String>();
private int _bufferSize=64*1024;
private boolean _blockingConnect=false;
private String _protocol;
private int _maxIdleTime=-1;
private WebSocketBuffers _buffers;
public WebSocketClient(ThreadPool threadpool)
{
_threadPool=threadpool;
addBean(_selector);
addBean(_threadPool);
}
public WebSocketClient()
{
this(new QueuedThreadPool());
}
public WebSocketClient(ThreadPool threadpool)
{
_root=this;
_parent=null;
_threadPool=threadpool;
_selector=new Selector();
_connectQ=new Timeout();
addBean(_selector);
addBean(_threadPool);
}
public WebSocketClient(WebSocketClient parent)
{
_root=parent._root;
_parent=parent;
_threadPool=parent._threadPool;
_selector=parent._selector;
_connectQ=new Timeout();
_parent.addBean(this);
}
public SelectorManager getSelectorManager()
{
return _selector;
@ -69,26 +96,14 @@ public class WebSocketClient extends AggregateLifeCycle
return _threadPool;
}
public int getConnectTimeout()
{
return _connectTimeout;
}
public void setConnectTimeout(int connectTimeout)
{
if (isRunning())
throw new IllegalStateException(getState());
_connectTimeout = connectTimeout;
}
public int getMaxIdleTime()
{
return (int)_selector.getMaxIdleTime();
return _maxIdleTime;
}
public void setMaxIdleTime(int maxIdleTime)
{
_selector.setMaxIdleTime(maxIdleTime);
_maxIdleTime=maxIdleTime;
}
public int getBufferSize()
@ -98,38 +113,70 @@ public class WebSocketClient extends AggregateLifeCycle
public void setBufferSize(int bufferSize)
{
if (isRunning())
throw new IllegalStateException(getState());
_bufferSize = bufferSize;
}
public boolean isBlockingConnect()
public String getProtocol()
{
return _blockingConnect;
return _protocol;
}
public void setBlockingConnect(boolean blockingConnect)
public void setProtocol(String protocol)
{
_blockingConnect = blockingConnect;
_protocol = protocol;
}
@Override
protected void doStart() throws Exception
{
if (_parent!=null && !_parent.isRunning())
throw new IllegalStateException("parent:"+getState());
_buffers = new WebSocketBuffers(_bufferSize);
super.doStart();
for (int i=0;i<_selector.getSelectSets();i++)
// Start a selector and timer if this is the root client
if (_parent==null)
{
final int id=i;
_threadPool.dispatch(new Runnable(){
for (int i=0;i<_selector.getSelectSets();i++)
{
final int id=i;
_threadPool.dispatch(new Runnable()
{
public void run()
{
while(isRunning())
{
try
{
_selector.doSelect(id);
}
catch (IOException e)
{
__log.warn(e);
}
}
}
});
}
_connectQ.setDuration(0);
_threadPool.dispatch(new Runnable()
{
public void run()
{
while(isRunning())
{
try
{
_selector.doSelect(id);
Thread.sleep(200); // TODO configure?
_connectQ.tick(System.currentTimeMillis());
}
catch (IOException e)
catch(Exception e)
{
__log.warn(e);
}
@ -137,43 +184,24 @@ public class WebSocketClient extends AggregateLifeCycle
}
});
}
_connectQ.setDuration(_connectTimeout);
_threadPool.dispatch(new Runnable(){
public void run()
{
while(isRunning())
{
try
{
Thread.sleep(200); // TODO configure?
_connectQ.tick(System.currentTimeMillis());
}
catch(Exception e)
{
__log.warn(e);
}
}
}
});
}
public void open(URI uri, WebSocket websocket) throws IOException
public WebSocket.Connection open(URI uri, WebSocket websocket,long maxConnectTime,TimeUnit units) throws IOException, InterruptedException, TimeoutException
{
open(uri,websocket,null,(int)_selector.getMaxIdleTime(),null,null);
try
{
return open(uri,websocket).get(maxConnectTime,units);
}
catch (ExecutionException e)
{
Throwable cause = e.getCause();
if (cause instanceof IOException)
throw (IOException)cause;
throw new RuntimeException(cause);
}
}
public void open(URI uri, WebSocket websocket, String protocol,int maxIdleTime) throws IOException
{
open(uri,websocket,protocol,(int)_selector.getMaxIdleTime(),null,null);
}
public void open(URI uri, WebSocket websocket, String protocol,int maxIdleTime,Map<String,String> cookies) throws IOException
{
open(uri,websocket,protocol,(int)_selector.getMaxIdleTime(),cookies,null);
}
public void open(URI uri, WebSocket websocket, String protocol,int maxIdleTime,Map<String,String> cookies,List<String> extensions) throws IOException
public Future<WebSocket.Connection> open(URI uri, WebSocket websocket) throws IOException
{
if (!isStarted())
throw new IllegalStateException("!started");
@ -185,36 +213,21 @@ public class WebSocketClient extends AggregateLifeCycle
SocketChannel channel = SocketChannel.open();
channel.socket().setTcpNoDelay(true);
channel.socket().setSoTimeout(getMaxIdleTime());
int maxIdleTime = getMaxIdleTime();
if (maxIdleTime<0)
maxIdleTime=(int)_selector.getMaxIdleTime();
if (maxIdleTime>0)
channel.socket().setSoTimeout(maxIdleTime);
InetSocketAddress address=new InetSocketAddress(uri.getHost(),uri.getPort());
WebSocketHolder holder=new WebSocketHolder(websocket,uri,protocol,maxIdleTime,cookies,extensions,channel);
final WebSocketHolder holder=new WebSocketHolder(websocket,uri,_protocol,maxIdleTime,_cookies,_extensions,channel);
_connectQ.schedule(holder);
boolean thrown=true;
try
{
if (isBlockingConnect())
{
channel.socket().connect(address,0);
channel.configureBlocking(false);
}
else
{
channel.configureBlocking(false);
channel.connect(address);
}
channel.configureBlocking(false);
channel.connect(address);
_selector.register( channel, holder);
_selector.register( channel, holder);
thrown=false;
}
finally
{
if (thrown)
holder.cancel();
}
return holder;
}
@ -242,11 +255,13 @@ public class WebSocketClient extends AggregateLifeCycle
@Override
protected void endPointOpened(SelectChannelEndPoint endpoint)
{
// TODO expose on outer class
}
@Override
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
{
throw new IllegalStateException();
}
@Override
@ -264,8 +279,8 @@ public class WebSocketClient extends AggregateLifeCycle
{
__log.debug(ex);
WebSocketHolder holder = (WebSocketHolder)attachment;
holder.cancel();
holder.getWebSocket().onError(ex.toString(),ex);
holder.handshakeFailed(ex);
}
}
}
@ -329,15 +344,18 @@ public class WebSocketClient extends AggregateLifeCycle
}
});
String path=_holder.getURI().getPath();
if (path==null || path.length()==0)
path="/";
String request=
"GET "+_holder.getURI().getPath()+" HTTP/1.1\r\n"+
"GET "+path+" HTTP/1.1\r\n"+
"Host: "+holder.getURI().getHost()+":"+_holder.getURI().getPort()+"\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: "+_key+"\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Version: 8\r\n";
"Sec-WebSocket-Version: "+WebSocketConnectionD10.VERSION+"\r\n";
if (holder.getProtocol()!=null)
request+="Sec-WebSocket-Protocol: "+holder.getProtocol()+"\r\n";
@ -357,16 +375,16 @@ public class WebSocketClient extends AggregateLifeCycle
try
{
Buffer handshake = new View(new ByteArrayBuffer(request));
Buffer handshake = new ByteArrayBuffer(request,false);
int len=handshake.length();
if (len!=_endp.flush(handshake))
throw new IOException("incomplete");
}
catch(IOException e)
{
__log.debug(e);
_holder.getWebSocket().onError("Handshake failed",e);
holder.handshakeFailed(e);
}
}
public Connection handle() throws IOException
@ -376,8 +394,7 @@ public class WebSocketClient extends AggregateLifeCycle
switch (_parser.parseAvailable())
{
case -1:
_holder.cancel();
_holder.getWebSocket().onError("EOF",new EOFException());
_holder.handshakeFailed(new IOException("Incomplete handshake response"));
return this;
case 0:
return this;
@ -400,10 +417,8 @@ public class WebSocketClient extends AggregateLifeCycle
connection.fillBuffersFrom(header);
_buffers.returnBuffer(header);
if (_holder.getWebSocket() instanceof WebSocket.OnFrame)
((WebSocket.OnFrame)_holder.getWebSocket()).onHandshake((WebSocket.FrameConnection)connection.getConnection());
_holder.cancel();
_holder.getWebSocket().onOpen(connection.getConnection());
_holder.onConnection(connection);
return connection;
}
}
@ -424,13 +439,22 @@ public class WebSocketClient extends AggregateLifeCycle
public void closed()
{
_holder.cancel();
_holder.getWebSocket().onError(_error==null?"EOF":_error,null);
if (_error!=null)
_holder.handshakeFailed(new ProtocolException(_error));
else
_holder.handshakeFailed(new EOFException());
}
}
class ProtocolException extends IOException
{
ProtocolException(String reason)
{
super(reason);
}
}
class WebSocketHolder extends Timeout.Task
class WebSocketHolder implements Future<WebSocket.Connection>
{
final WebSocket _websocket;;
final URI _uri;
@ -438,7 +462,20 @@ public class WebSocketClient extends AggregateLifeCycle
final int _maxIdleTime;
final Map<String,String> _cookies;
final List<String> _extensions;
final ByteChannel _channel;
final CountDownLatch _latch = new CountDownLatch(1);
ByteChannel _channel;
WebSocketConnection _connection;
Throwable _exception;
final Timeout.Task _timeout = new Timeout.Task()
{
@Override
public void expired()
{
handshakeFailed(new IOException("expired"));
}
};
public WebSocketHolder(WebSocket websocket, URI uri, String protocol, int maxIdleTime, Map<String,String> cookies,List<String> extensions, ByteChannel channel)
{
@ -451,6 +488,63 @@ public class WebSocketClient extends AggregateLifeCycle
_channel=channel;
}
public void onConnection(WebSocketConnection connection)
{
try
{
_timeout.cancel();
synchronized (this)
{
if (_channel!=null)
_connection=connection;
}
if (_connection!=null)
{
if (_websocket instanceof WebSocket.OnFrame)
((WebSocket.OnFrame)_websocket).onHandshake((WebSocket.FrameConnection)connection.getConnection());
_websocket.onOpen(connection.getConnection());
}
}
finally
{
_latch.countDown();
}
}
public void handshakeFailed(Throwable ex)
{
try
{
_timeout.cancel();
ByteChannel channel=null;
synchronized (this)
{
if (_channel!=null)
{
channel=_channel;
_channel=null;
_exception=ex;
}
}
if (channel!=null)
{
if (ex instanceof ProtocolException)
closeChannel(channel,WebSocketConnectionD10.CLOSE_PROTOCOL,ex.getMessage());
else
closeChannel(channel,WebSocketConnectionD10.CLOSE_NOCLOSE,ex.getMessage());
}
}
finally
{
_latch.countDown();
}
}
public Map<String,String> getCookies()
{
return _cookies;
@ -475,26 +569,116 @@ public class WebSocketClient extends AggregateLifeCycle
{
return _maxIdleTime;
}
@Override
public void expired()
{
try
{
__log.debug("expired "+this);
getWebSocket().onError("expired",null);
_channel.close();
}
catch(IOException e)
{
__log.ignore(e);
}
}
public String toString()
{
return "[" + _uri + ","+_websocket+"]@"+hashCode();
}
public boolean cancel(boolean mayInterruptIfRunning)
{
try
{
ByteChannel channel=null;
synchronized (this)
{
if (_connection==null && _exception==null && _channel!=null)
{
channel=_channel;
_channel=null;
}
}
if (channel!=null)
{
closeChannel(channel,WebSocketConnectionD10.CLOSE_NOCLOSE,"cancelled");
return true;
}
return false;
}
finally
{
_latch.countDown();
}
}
public boolean isCancelled()
{
synchronized (this)
{
return _channel==null && _connection==null;
}
}
public boolean isDone()
{
synchronized (this)
{
return _connection!=null && _exception==null;
}
}
public org.eclipse.jetty.websocket.WebSocket.Connection get() throws InterruptedException, ExecutionException
{
try
{
return get(Long.MAX_VALUE,TimeUnit.SECONDS);
}
catch(TimeoutException e)
{
throw new IllegalStateException("The universe has ended",e);
}
}
public org.eclipse.jetty.websocket.WebSocket.Connection get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException
{
_latch.await(timeout,unit);
ByteChannel channel=null;
org.eclipse.jetty.websocket.WebSocket.Connection connection=null;
Throwable exception=null;
synchronized (this)
{
exception=_exception;
if (_connection==null)
{
exception=_exception;
channel=_channel;
_channel=null;
}
else
connection=_connection.getConnection();
}
if (channel!=null)
closeChannel(channel,WebSocketConnectionD10.CLOSE_NOCLOSE,"timeout");
if (exception!=null)
throw new ExecutionException(exception);
if (connection!=null)
return connection;
throw new TimeoutException();
}
private void closeChannel(ByteChannel channel,int code, String message)
{
try
{
_websocket.onClose(code,message);
}
catch(Exception e)
{
__log.warn(e);
}
try
{
channel.close();
}
catch(IOException e)
{
__log.debug(e);
}
}
}
}

View File

@ -17,4 +17,6 @@ public interface WebSocketConnection extends Connection
void handshake(HttpServletRequest request, HttpServletResponse response, String origin, String subprotocol) throws IOException;
List<Extension> getExtensions();
WebSocket.Connection getConnection();
}

View File

@ -87,6 +87,13 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
}
}
/* ------------------------------------------------------------ */
public org.eclipse.jetty.websocket.WebSocket.Connection getConnection()
{
return this;
}
/* ------------------------------------------------------------ */
public void setHixieKeys(String key1,String key2)
{

View File

@ -60,6 +60,8 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
final static int CLOSE_NOCLOSE=1006;
final static int CLOSE_NOTUTF8=1007;
final static int VERSION=8;
static boolean isLastFrame(byte flags)
{
return (flags&0x8)!=0;

View File

@ -7,6 +7,7 @@ import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
@ -14,7 +15,10 @@ import java.net.URI;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -28,20 +32,20 @@ import org.junit.Test;
public class WebSocketClientTest
{
private ServerSocket server;
private int serverPort;
private ServerSocket _server;
private int _serverPort;
@Before
public void startServer() throws IOException {
server = new ServerSocket();
server.bind(null);
serverPort = server.getLocalPort();
_server = new ServerSocket();
_server.bind(null);
_serverPort = _server.getLocalPort();
}
@After
public void stopServer() throws IOException {
if(server != null) {
server.close();
if(_server != null) {
_server.close();
}
}
@ -61,10 +65,6 @@ public class WebSocketClientTest
{
open.set(true);
}
public void onError(String message, Throwable ex)
{
}
public void onClose(int closeCode, String message)
{}
@ -79,548 +79,287 @@ public class WebSocketClientTest
Assert.assertTrue(bad);
Assert.assertFalse(open.get());
}
@Test
public void testBlockingConnectionRefused() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
client.setBlockingConnect(true);
boolean bad=false;
final AtomicBoolean open = new AtomicBoolean();
try
{
client.open(new URI("ws://127.0.0.1:1"),new WebSocket()
{
public void onOpen(Connection connection)
{
open.set(true);
}
public void onError(String message, Throwable ex)
{
}
public void onClose(int closeCode, String message)
{}
});
Assert.fail();
}
catch(IOException e)
{
bad=true;
}
Assert.assertTrue(bad);
Assert.assertFalse(open.get());
}
@Test
public void testAsyncConnectionRefused() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setConnectTimeout(1000);
client.start();
client.setBlockingConnect(false);
boolean bad=false;
final AtomicBoolean open = new AtomicBoolean();
final AtomicReference<String> error = new AtomicReference<String>(null);
final AtomicInteger close = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
Future<WebSocket.Connection> future=client.open(new URI("ws://127.0.0.1:1"),new WebSocket()
{
public void onOpen(Connection connection)
{
open.set(true);
}
public void onClose(int closeCode, String message)
{
close.set(closeCode);
}
});
Throwable error=null;
try
{
client.open(new URI("ws://127.0.0.1:1"),new WebSocket()
{
public void onOpen(Connection connection)
{
open.set(true);
latch.countDown();
}
public void onError(String message, Throwable ex)
{
error.set(message);
latch.countDown();
}
public void onClose(int closeCode, String message)
{
close.set(closeCode);
latch.countDown();
}
});
future.get(1,TimeUnit.SECONDS);
Assert.fail();
}
catch(IOException e)
catch(ExecutionException e)
{
bad=true;
error=e.getCause();
}
Assert.assertFalse(bad);
Assert.assertFalse(open.get());
Assert.assertTrue(latch.await(1,TimeUnit.SECONDS));
Assert.assertNotNull(error.get());
Assert.assertEquals(WebSocketConnectionD10.CLOSE_NOCLOSE,close.get());
Assert.assertTrue(error instanceof ConnectException);
}
@Test
public void testBlockingConnectionNotAccepted() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setConnectTimeout(500);
client.setBlockingConnect(true);
client.start();
boolean bad=false;
final AtomicReference<String> error = new AtomicReference<String>(null);
final CountDownLatch latch = new CountDownLatch(1);
try
{
client.open(new URI("ws://127.0.0.1:"+serverPort),new WebSocket()
{
public void onOpen(Connection connection)
{
latch.countDown();
}
public void onError(String message, Throwable ex)
{
error.set(message);
latch.countDown();
}
public void onClose(int closeCode, String message)
{
latch.countDown();
}
});
}
catch(IOException e)
{
e.printStackTrace();
bad=true;
}
Assert.assertTrue(latch.await(1,TimeUnit.SECONDS));
Assert.assertTrue(bad||error.get()!=null);
}
@Test
public void testAsyncConnectionNotAccepted() throws Exception
public void testConnectionNotAccepted() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setBlockingConnect(true);
client.setConnectTimeout(300);
client.start();
boolean bad=false;
final AtomicBoolean open = new AtomicBoolean();
final AtomicReference<String> error = new AtomicReference<String>(null);
final AtomicInteger close = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
Future<WebSocket.Connection> future=client.open(new URI("ws://127.0.0.1:"+_serverPort),new WebSocket()
{
public void onOpen(Connection connection)
{
open.set(true);
}
public void onClose(int closeCode, String message)
{
close.set(closeCode);
}
});
Throwable error=null;
try
{
client.open(new URI("ws://127.0.0.1:"+serverPort),new WebSocket()
{
public void onOpen(Connection connection)
{
open.set(true);
latch.countDown();
}
public void onError(String message, Throwable ex)
{
error.set(message);
latch.countDown();
}
public void onClose(int closeCode, String message)
{
close.set(closeCode);
latch.countDown();
}
});
future.get(250,TimeUnit.MILLISECONDS);
Assert.fail();
}
catch(IOException e)
catch(TimeoutException e)
{
bad=true;
error=e;
}
Assert.assertFalse(bad);
Assert.assertFalse(open.get());
Assert.assertTrue(latch.await(1,TimeUnit.SECONDS));
Assert.assertNotNull(error.get());
}
@Test
public void testBlockingConnectionTimeout() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setConnectTimeout(500);
client.setBlockingConnect(true);
client.start();
boolean bad=false;
final AtomicReference<String> error = new AtomicReference<String>(null);
final CountDownLatch latch = new CountDownLatch(1);
try
{
client.open(new URI("ws://127.0.0.1:"+serverPort),new WebSocket()
{
public void onOpen(Connection connection)
{
latch.countDown();
}
public void onError(String message, Throwable ex)
{
error.set(message);
latch.countDown();
}
public void onClose(int closeCode, String message)
{
latch.countDown();
}
});
}
catch(IOException e)
{
e.printStackTrace();
bad=true;
}
Assert.assertEquals(WebSocketConnectionD10.CLOSE_NOCLOSE,close.get());
Assert.assertTrue(error instanceof TimeoutException);
Assert.assertNotNull(server.accept());
Assert.assertTrue(latch.await(1,TimeUnit.SECONDS));
Assert.assertTrue(bad||error.get()!=null);
}
@Test
public void testAsyncConnectionTimeout() throws Exception
public void testConnectionTimeout() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setBlockingConnect(true);
client.setConnectTimeout(300);
client.start();
boolean bad=false;
final AtomicBoolean open = new AtomicBoolean();
final AtomicReference<String> error = new AtomicReference<String>(null);
final AtomicInteger close = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
Future<WebSocket.Connection> future=client.open(new URI("ws://127.0.0.1:"+_serverPort),new WebSocket()
{
public void onOpen(Connection connection)
{
open.set(true);
}
public void onClose(int closeCode, String message)
{
close.set(closeCode);
}
});
Assert.assertNotNull(_server.accept());
Throwable error=null;
try
{
client.open(new URI("ws://127.0.0.1:"+serverPort),new WebSocket()
{
public void onOpen(Connection connection)
{
open.set(true);
latch.countDown();
}
public void onError(String message, Throwable ex)
{
error.set(message);
latch.countDown();
}
public void onClose(int closeCode, String message)
{
close.set(closeCode);
latch.countDown();
}
});
future.get(250,TimeUnit.MILLISECONDS);
Assert.fail();
}
catch(IOException e)
catch(TimeoutException e)
{
bad=true;
error=e;
}
Assert.assertNotNull(server.accept());
Assert.assertFalse(bad);
Assert.assertFalse(open.get());
Assert.assertTrue(latch.await(1,TimeUnit.SECONDS));
Assert.assertNotNull(error.get());
Assert.assertEquals(WebSocketConnectionD10.CLOSE_NOCLOSE,close.get());
Assert.assertTrue(error instanceof TimeoutException);
}
@Test
public void testBadHandshake() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setBlockingConnect(true);
client.setConnectTimeout(300);
client.start();
final AtomicBoolean open = new AtomicBoolean();
final AtomicReference<String> error = new AtomicReference<String>(null);
final AtomicInteger close = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
client.open(new URI("ws://127.0.0.1:"+serverPort),new WebSocket()
Future<WebSocket.Connection> future=client.open(new URI("ws://127.0.0.1:"+_serverPort+"/"),new WebSocket()
{
public void onOpen(Connection connection)
{
new Throwable().printStackTrace();
System.out.printf("onOpen(%s)%n", connection);
System.out.flush();
// TODO I don't think we should be seeing onOpen called on the
// bad handshake because the error here should mean that there is no
// websocket, so no onOpen call
// what we are seeing is the onOpen is intermittently showing up before the
// onError which triggers the countdown latch and the error message is null
// at that point.
open.set(true);
latch.countDown();
}
public void onError(String message, Throwable ex)
{
System.out.printf("onError(%s, %s)%n", message, ex);
System.out.flush();
error.set(message);
latch.countDown();
}
public void onClose(int closeCode, String message)
{
System.out.printf("onClose(%d, %s)%n", closeCode, message);
System.out.flush();
close.set(closeCode);
latch.countDown();
}
});
Socket connection = server.accept();
Socket connection = _server.accept();
respondToClient(connection, "HTTP/1.1 404 NOT FOUND\r\n\r\n");
Assert.assertFalse(open.get());
Assert.assertTrue(latch.await(10,TimeUnit.SECONDS));
Assert.assertThat("error.get()", error.get(), containsString("404 NOT FOUND"));
}
private void respondToClient(Socket connection, String serverResponse) throws IOException
{
InputStream in = null;
InputStreamReader isr = null;
BufferedReader buf = null;
OutputStream out = null;
try {
in = connection.getInputStream();
isr = new InputStreamReader(in);
buf = new BufferedReader(isr);
String line;
while((line = buf.readLine())!=null)
{
System.err.println(line);
if(line.length() == 0)
{
// Got the "\r\n" line.
break;
}
}
// System.out.println("[Server-Out] " + serverResponse);
out = connection.getOutputStream();
out.write(serverResponse.getBytes());
out.flush();
}
finally
{
IO.close(buf);
IO.close(isr);
IO.close(in);
IO.close(out);
Throwable error=null;
try
{
future.get(250,TimeUnit.MILLISECONDS);
Assert.fail();
}
catch(ExecutionException e)
{
error=e.getCause();
}
Assert.assertFalse(open.get());
Assert.assertEquals(WebSocketConnectionD10.CLOSE_PROTOCOL,close.get());
Assert.assertTrue(error instanceof IOException);
Assert.assertTrue(error.getMessage().indexOf("404 NOT FOUND")>0);
}
@Test
public void testBadUpgrade() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setBlockingConnect(true);
client.setConnectTimeout(10000);
client.start();
boolean bad=false;
final AtomicBoolean open = new AtomicBoolean();
final AtomicReference<String> error = new AtomicReference<String>(null);
final AtomicInteger close = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
try
Future<WebSocket.Connection> future=client.open(new URI("ws://127.0.0.1:"+_serverPort+"/"),new WebSocket()
{
client.open(new URI("ws://127.0.0.1:"+serverPort),new WebSocket()
public void onOpen(Connection connection)
{
public void onOpen(Connection connection)
{
open.set(true);
latch.countDown();
}
open.set(true);
}
public void onError(String message, Throwable ex)
{
error.set(message);
latch.countDown();
}
public void onClose(int closeCode, String message)
{
close.set(closeCode);
}
});
public void onClose(int closeCode, String message)
{
close.set(closeCode);
latch.countDown();
}
});
}
catch(IOException e)
{
bad=true;
}
Socket connection = server.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
for (String line=in.readLine();line!=null;line=in.readLine())
{
// System.err.println(line);
if (line.length()==0)
break;
}
connection.getOutputStream().write((
Socket connection = _server.accept();
respondToClient(connection,
"HTTP/1.1 101 Upgrade\r\n" +
"Sec-WebSocket-Accept: rubbish\r\n" +
"\r\n").getBytes());
Assert.assertFalse(bad);
Assert.assertFalse(open.get());
Assert.assertTrue(latch.await(1,TimeUnit.SECONDS));
Assert.assertNotNull(error.get());
}
"\r\n" );
@Test
public void testUpgrade() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setBlockingConnect(true);
client.setConnectTimeout(10000);
client.start();
boolean bad=false;
final AtomicBoolean open = new AtomicBoolean();
final AtomicReference<String> error = new AtomicReference<String>(null);
final AtomicInteger close = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(1);
Throwable error=null;
try
{
client.open(new URI("ws://127.0.0.1:"+serverPort),new WebSocket()
future.get(250,TimeUnit.MILLISECONDS);
Assert.fail();
}
catch(ExecutionException e)
{
error=e.getCause();
}
Assert.assertFalse(open.get());
Assert.assertEquals(WebSocketConnectionD10.CLOSE_PROTOCOL,close.get());
Assert.assertTrue(error instanceof IOException);
Assert.assertTrue(error.getMessage().indexOf("Bad Sec-WebSocket-Accept")>=0);
}
@Test
public void testUpgradeThenTCPClose() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.start();
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
final CountDownLatch _latch = new CountDownLatch(1);
Future<WebSocket.Connection> future=client.open(new URI("ws://127.0.0.1:"+_serverPort+"/"),new WebSocket()
{
public void onOpen(Connection connection)
{
public void onOpen(Connection connection)
{
open.set(true);
latch.countDown();
}
open.set(true);
}
public void onError(String message, Throwable ex)
{
error.set(message);
latch.countDown();
}
public void onClose(int closeCode, String message)
{
close.set(closeCode);
latch.countDown();
}
});
}
catch(IOException e)
{
bad=true;
}
Assert.assertFalse(bad);
public void onClose(int closeCode, String message)
{
close.set(closeCode);
_latch.countDown();
}
});
String key="not sent";
Socket connection = server.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
for (String line=in.readLine();line!=null;line=in.readLine())
{
if (line.length()==0)
break;
if (line.startsWith("Sec-WebSocket-Key:"))
key=line.substring(18).trim();
}
connection.getOutputStream().write((
"HTTP/1.1 101 Upgrade\r\n" +
"Sec-WebSocket-Accept: "+ WebSocketConnectionD10.hashKey(key) +"\r\n" +
"\r\n").getBytes());
Socket socket = _server.accept();
accept(socket);
Assert.assertTrue(latch.await(1,TimeUnit.SECONDS));
Assert.assertNull(error.get());
WebSocket.Connection connection = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertNotNull(connection);
Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get());
socket.close();
_latch.await(10,TimeUnit.SECONDS);
Assert.assertEquals(WebSocketConnectionD10.CLOSE_NOCLOSE,close.get());
}
@Test
public void testIdle() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setBlockingConnect(true);
client.setConnectTimeout(10000);
client.setMaxIdleTime(500);
client.start();
boolean bad=false;
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
final CountDownLatch latch = new CountDownLatch(2);
try
final CountDownLatch _latch = new CountDownLatch(1);
Future<WebSocket.Connection> future=client.open(new URI("ws://127.0.0.1:"+_serverPort+"/"),new WebSocket()
{
client.open(new URI("ws://127.0.0.1:"+serverPort),new WebSocket()
public void onOpen(Connection connection)
{
public void onOpen(Connection connection)
{
open.set(true);
latch.countDown();
}
open.set(true);
}
public void onError(String message, Throwable ex)
{
latch.countDown();
}
public void onClose(int closeCode, String message)
{
close.set(closeCode);
latch.countDown();
}
});
}
catch(IOException e)
{
bad=true;
}
Assert.assertFalse(bad);
public void onClose(int closeCode, String message)
{
close.set(closeCode);
_latch.countDown();
}
});
String key="not sent";
Socket connection = server.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
for (String line=in.readLine();line!=null;line=in.readLine())
{
if (line.length()==0)
break;
if (line.startsWith("Sec-WebSocket-Key:"))
key=line.substring(18).trim();
}
connection.getOutputStream().write((
"HTTP/1.1 101 Upgrade\r\n" +
"Sec-WebSocket-Accept: "+ WebSocketConnectionD10.hashKey(key) +"\r\n" +
"\r\n").getBytes());
Socket socket = _server.accept();
accept(socket);
Assert.assertTrue(latch.await(10,TimeUnit.SECONDS));
WebSocket.Connection connection = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertNotNull(connection);
Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get());
long start=System.currentTimeMillis();
_latch.await(10,TimeUnit.SECONDS);
Assert.assertTrue(System.currentTimeMillis()-start<5000);
Assert.assertEquals(WebSocketConnectionD10.CLOSE_NORMAL,close.get());
}
@ -629,73 +368,41 @@ public class WebSocketClientTest
public void testNotIdle() throws Exception
{
WebSocketClient client = new WebSocketClient();
client.setBlockingConnect(true);
client.setConnectTimeout(10000);
client.setMaxIdleTime(500);
client.start();
boolean bad=false;
final AtomicBoolean open = new AtomicBoolean();
final Exchanger<Integer> close = new Exchanger<Integer>();
final CountDownLatch latch = new CountDownLatch(1);
final AtomicReference<WebSocket.Connection> connection = new AtomicReference<WebSocket.Connection>();
final AtomicInteger close = new AtomicInteger();
final CountDownLatch _latch = new CountDownLatch(1);
final BlockingQueue<String> queue = new BlockingArrayQueue<String>();
try
Future<WebSocket.Connection> future=client.open(new URI("ws://127.0.0.1:"+_serverPort+"/"),new WebSocket.OnTextMessage()
{
client.open(new URI("ws://127.0.0.1:"+serverPort),new WebSocket.OnTextMessage()
public void onOpen(Connection connection)
{
public void onOpen(Connection c)
{
open.set(true);
connection.set(c);
latch.countDown();
}
open.set(true);
}
public void onError(String message, Throwable ex)
{
latch.countDown();
}
public void onClose(int closeCode, String message)
{
try
{
close.exchange(closeCode);
}
catch(InterruptedException ex)
{}
latch.countDown();
}
public void onMessage(String data)
{
queue.add(data);
}
});
}
catch(IOException e)
{
bad=true;
}
Assert.assertFalse(bad);
public void onClose(int closeCode, String message)
{
close.set(closeCode);
_latch.countDown();
}
public void onMessage(String data)
{
queue.add(data);
}
});
String key="not sent";
Socket socket = server.accept();
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
for (String line=in.readLine();line!=null;line=in.readLine())
{
if (line.length()==0)
break;
if (line.startsWith("Sec-WebSocket-Key:"))
key=line.substring(18).trim();
}
socket.getOutputStream().write((
"HTTP/1.1 101 Upgrade\r\n" +
"Sec-WebSocket-Accept: "+ WebSocketConnectionD10.hashKey(key) +"\r\n" +
"\r\n").getBytes());
Socket socket = _server.accept();
accept(socket);
Assert.assertTrue(latch.await(10,TimeUnit.SECONDS));
WebSocket.Connection connection = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertNotNull(connection);
Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get());
// Send some messages client to server
byte[] recv = new byte[1024];
@ -703,7 +410,7 @@ public class WebSocketClientTest
for (int i=0;i<10;i++)
{
Thread.sleep(250);
connection.get().sendMessage("Hello");
connection.sendMessage("Hello");
len=socket.getInputStream().read(recv,0,recv.length);
Assert.assertTrue(len>0);
}
@ -719,9 +426,68 @@ public class WebSocketClientTest
Assert.assertEquals("Hi",queue.poll(1,TimeUnit.SECONDS));
}
// Close with code
long start=System.currentTimeMillis();
socket.getOutputStream().write(new byte[]{(byte)0x88, (byte) 0x02, (byte)4, (byte)87 },0,4);
socket.getOutputStream().flush();
_latch.await(10,TimeUnit.SECONDS);
Assert.assertTrue(System.currentTimeMillis()-start<5000);
Assert.assertEquals(1111,close.get());
Assert.assertEquals(new Integer(1111),close.exchange(null,1,TimeUnit.SECONDS));
}
private void respondToClient(Socket connection, String serverResponse) throws IOException
{
InputStream in = null;
InputStreamReader isr = null;
BufferedReader buf = null;
OutputStream out = null;
try {
in = connection.getInputStream();
isr = new InputStreamReader(in);
buf = new BufferedReader(isr);
String line;
while((line = buf.readLine())!=null)
{
// System.err.println(line);
if(line.length() == 0)
{
// Got the "\r\n" line.
break;
}
}
// System.out.println("[Server-Out] " + serverResponse);
out = connection.getOutputStream();
out.write(serverResponse.getBytes());
out.flush();
}
finally
{
IO.close(buf);
IO.close(isr);
IO.close(in);
IO.close(out);
}
}
private void accept(Socket connection) throws IOException
{
String key="not sent";
BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()));
for (String line=in.readLine();line!=null;line=in.readLine())
{
if (line.length()==0)
break;
if (line.startsWith("Sec-WebSocket-Key:"))
key=line.substring(18).trim();
}
connection.getOutputStream().write((
"HTTP/1.1 101 Upgrade\r\n" +
"Sec-WebSocket-Accept: "+ WebSocketConnectionD10.hashKey(key) +"\r\n" +
"\r\n").getBytes());
}
}

View File

@ -116,10 +116,6 @@ public class WebSocketLoadD10Test
{
this.outbound = outbound;
}
public void onError(String message,Throwable ex)
{
}
public void onMessage(String data)
{

View File

@ -249,10 +249,6 @@ public class WebSocketMessageD00Test
{
return latch.await(time, TimeUnit.MILLISECONDS);
}
public void onError(String message,Throwable ex)
{
}
public void onClose(int code,String message)
{

View File

@ -756,10 +756,6 @@ public class WebSocketMessageD06Test
{
this.connection = connection;
}
public void onError(String message,Throwable ex)
{
}
public void onOpen(Connection connection)
{

View File

@ -981,11 +981,6 @@ public class WebSocketMessageD10Test
{
return disconnected.await(time, TimeUnit.MILLISECONDS);
}
public void onError(String message,Throwable ex)
{
disconnected.countDown();
}
public void onClose(int code,String message)
{

View File

@ -67,12 +67,6 @@ public class WebSocketChatServlet extends WebSocketServlet
}
}
}
public void onError(String message,Throwable ex)
{
Log.warn(this+" onError",ex);
_members.remove(this);
}
public void onClose(int code, String message)
{