353073 Improved client javadoc

This commit is contained in:
Greg Wilkins 2011-08-11 12:18:36 +10:00
parent 3d36f07a62
commit d63294bfe4
15 changed files with 182 additions and 144 deletions

View File

@ -6,7 +6,6 @@ import java.util.Map;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.util.QuotedStringTokenizer;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.websocket.WebSocketParser.FrameHandler;
public class AbstractExtension implements Extension

View File

@ -8,7 +8,6 @@ import java.util.zip.Inflater;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.util.ByteArrayOutputStream2;
import org.eclipse.jetty.util.log.Log;
public class DeflateFrameExtension extends AbstractExtension

View File

@ -1,32 +1,18 @@
package org.eclipse.jetty.websocket;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.URI;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.bio.SocketEndPoint;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.websocket.WebSocket.Connection;
import org.eclipse.jetty.websocket.WebSocket.FrameConnection;
/**
* @version $Revision$ $Date$

View File

@ -15,11 +15,8 @@ package org.eclipse.jetty.websocket;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.BuffersFactory;
import org.eclipse.jetty.io.Buffers.Type;
import org.eclipse.jetty.io.ThreadLocalBuffers;
import org.eclipse.jetty.io.nio.DirectNIOBuffer;
import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
import org.eclipse.jetty.io.BuffersFactory;
/* ------------------------------------------------------------ */

View File

@ -35,8 +35,38 @@ import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.eclipse.jetty.util.thread.ThreadPool;
import org.eclipse.jetty.util.thread.Timeout;
/* ------------------------------------------------------------ */
/** WebSocket Client
* <p>This WebSocket Client class can create multiple websocket connections to multiple destinations.
* It uses the same {@link WebSocket} endpoint API as the server.
* Simple usage is as follows: <pre>
* WebSocketClient client = new WebSocketClient();
* client.setMaxIdleTime(500);
* client.start();
*
* WebSocket.Connection connection = client.open(new URI("ws://127.0.0.1:8080/"),new WebSocket.OnTextMessage()
* {
* public void onOpen(Connection connection)
* {
* // open notification
* }
*
* public void onClose(int closeCode, String message)
* {
* // close notification
* }
*
* public void onMessage(String data)
* {
* // handle incoming message
* }
* }).get(5,TimeUnit.SECONDS);
*
* connection.sendMessage("Hello World");
* </pre>
*/
public class WebSocketClient extends AggregateLifeCycle
{
private final static Logger __log = org.eclipse.jetty.util.log.Log.getLogger(WebSocketClient.class.getCanonicalName());
@ -46,13 +76,11 @@ public class WebSocketClient extends AggregateLifeCycle
private final WebSocketClient _root;
private final WebSocketClient _parent;
private final ThreadPool _threadPool;
private final Selector _selector;
private final Timeout _connectQ;
private final WebSocketClientSelector _selector;
private final Map<String,String> _cookies=new ConcurrentHashMap<String, String>();
private final List<String> _extensions=new CopyOnWriteArrayList<String>();
private int _bufferSize=64*1024;
private String _protocol;
private int _maxIdleTime=-1;
@ -60,57 +88,94 @@ public class WebSocketClient extends AggregateLifeCycle
private WebSocketBuffers _buffers;
/* ------------------------------------------------------------ */
/** Create a WebSocket Client with default configuration.
*/
public WebSocketClient()
{
this(new QueuedThreadPool());
}
/* ------------------------------------------------------------ */
/** Create a WebSocket Client with shared threadpool.
* @param threadpool
*/
public WebSocketClient(ThreadPool threadpool)
{
_root=this;
_parent=null;
_threadPool=threadpool;
_selector=new Selector();
_connectQ=new Timeout();
_selector=new WebSocketClientSelector();
addBean(_selector);
addBean(_threadPool);
}
/* ------------------------------------------------------------ */
/** Create a WebSocket Client from another.
* <p>If multiple clients are required so that connections created may have different
* configurations, then it is more efficient to create a client based on another, so
* that the thread pool and IO infrastructure may be shared.
*/
public WebSocketClient(WebSocketClient parent)
{
_root=parent._root;
_parent=parent;
_threadPool=parent._threadPool;
_selector=parent._selector;
_connectQ=new Timeout();
_parent.addBean(this);
}
/* ------------------------------------------------------------ */
/**
* Get the selectorManager. Used to configure the manager.
* @return The {@link SelectorManager} instance.
*/
public SelectorManager getSelectorManager()
{
return _selector;
}
/* ------------------------------------------------------------ */
/** Get the ThreadPool.
* <p>Used to set/query the thread pool configuration.
* @return The {@link ThreadPool}
*/
public ThreadPool getThreadPool()
{
return _threadPool;
}
/* ------------------------------------------------------------ */
/** Get the maxIdleTime for connections opened by this client.
* @return The maxIdleTime in ms, or -1 if the default from {@link #getSelectorManager()} is used.
*/
public int getMaxIdleTime()
{
return _maxIdleTime;
}
/* ------------------------------------------------------------ */
/** Set the maxIdleTime for connections opened by this client.
* @param maxIdleTime max idle time in ms
*/
public void setMaxIdleTime(int maxIdleTime)
{
_maxIdleTime=maxIdleTime;
}
/* ------------------------------------------------------------ */
/** Get the WebSocket Buffer size for connections opened by this client.
* @return the buffer size in bytes.
*/
public int getBufferSize()
{
return _bufferSize;
}
/* ------------------------------------------------------------ */
/** Set the WebSocket Buffer size for connections opened by this client.
* @param bufferSize the buffer size in bytes.
*/
public void setBufferSize(int bufferSize)
{
if (isRunning())
@ -118,15 +183,90 @@ public class WebSocketClient extends AggregateLifeCycle
_bufferSize = bufferSize;
}
/* ------------------------------------------------------------ */
/** Get the subprotocol string for connections opened by this client.
* @return The subprotocol
*/
public String getProtocol()
{
return _protocol;
}
/* ------------------------------------------------------------ */
/** Set the subprotocol string for connections opened by this client.
* @param protocol The subprotocol
*/
public void setProtocol(String protocol)
{
_protocol = protocol;
}
/* ------------------------------------------------------------ */
/** Open a WebSocket connection.
* Open a websocket connection to the URI and block until the connection is accepted or there is an error.
* @param uri The URI to connect to.
* @param websocket The {@link WebSocket} instance to handle incoming events.
* @param maxConnectTime The interval to wait for a successful connection
* @param units the units of the maxConnectTime
* @return A {@link WebSocket.Connection}
* @throws IOException
* @throws InterruptedException
* @throws TimeoutException
*/
public WebSocket.Connection open(URI uri, WebSocket websocket,long maxConnectTime,TimeUnit units) throws IOException, InterruptedException, TimeoutException
{
try
{
return open(uri,websocket).get(maxConnectTime,units);
}
catch (ExecutionException e)
{
Throwable cause = e.getCause();
if (cause instanceof IOException)
throw (IOException)cause;
throw new RuntimeException(cause);
}
}
/* ------------------------------------------------------------ */
/** Asynchronously open a websocket connection.
* Open a websocket connection and return a {@link Future} to obtain the connection.
* The caller must call {@link Future#get(long, TimeUnit)} if they wish to impose a connect timeout on the open.
*
* @param uri The URI to connect to.
* @param websocket The {@link WebSocket} instance to handle incoming events.
* @return A {@link Future} to the {@link WebSocket.Connection}
* @throws IOException
*/
public Future<WebSocket.Connection> open(URI uri, WebSocket websocket) throws IOException
{
if (!isStarted())
throw new IllegalStateException("!started");
String scheme=uri.getScheme();
if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
throw new IllegalArgumentException("Bad WebSocket scheme '"+scheme+"'");
if ("wss".equalsIgnoreCase(scheme))
throw new IOException("wss not supported");
SocketChannel channel = SocketChannel.open();
channel.socket().setTcpNoDelay(true);
int maxIdleTime = getMaxIdleTime();
if (maxIdleTime<0)
maxIdleTime=(int)_selector.getMaxIdleTime();
if (maxIdleTime>0)
channel.socket().setSoTimeout(maxIdleTime);
InetSocketAddress address=new InetSocketAddress(uri.getHost(),uri.getPort());
final WebSocketFuture holder=new WebSocketFuture(websocket,uri,_protocol,maxIdleTime,_cookies,_extensions,channel);
channel.configureBlocking(false);
channel.connect(address);
_selector.register( channel, holder);
return holder;
}
@Override
protected void doStart() throws Exception
@ -162,83 +302,14 @@ public class WebSocketClient extends AggregateLifeCycle
}
});
}
_connectQ.setDuration(0);
_threadPool.dispatch(new Runnable()
{
public void run()
{
while(isRunning())
{
try
{
Thread.sleep(200); // TODO configure?
_connectQ.tick(System.currentTimeMillis());
}
catch(InterruptedException e)
{
if (isRunning())
__log.warn(e);
else
__log.ignore(e);
}
catch(Exception e)
{
__log.warn(e);
}
}
}
});
}
}
public WebSocket.Connection open(URI uri, WebSocket websocket,long maxConnectTime,TimeUnit units) throws IOException, InterruptedException, TimeoutException
{
try
{
return open(uri,websocket).get(maxConnectTime,units);
}
catch (ExecutionException e)
{
Throwable cause = e.getCause();
if (cause instanceof IOException)
throw (IOException)cause;
throw new RuntimeException(cause);
}
}
public Future<WebSocket.Connection> open(URI uri, WebSocket websocket) throws IOException
{
if (!isStarted())
throw new IllegalStateException("!started");
String scheme=uri.getScheme();
if (!("ws".equalsIgnoreCase(scheme) || "wss".equalsIgnoreCase(scheme)))
throw new IllegalArgumentException("Bad WebSocket scheme '"+scheme+"'");
if ("wss".equalsIgnoreCase(scheme))
throw new IOException("wss not supported");
SocketChannel channel = SocketChannel.open();
channel.socket().setTcpNoDelay(true);
int maxIdleTime = getMaxIdleTime();
if (maxIdleTime<0)
maxIdleTime=(int)_selector.getMaxIdleTime();
if (maxIdleTime>0)
channel.socket().setSoTimeout(maxIdleTime);
InetSocketAddress address=new InetSocketAddress(uri.getHost(),uri.getPort());
final WebSocketHolder holder=new WebSocketHolder(websocket,uri,_protocol,maxIdleTime,_cookies,_extensions,channel);
channel.configureBlocking(false);
channel.connect(address);
_selector.register( channel, holder);
return holder;
}
class Selector extends SelectorManager
/* ------------------------------------------------------------ */
/** WebSocket Client Selector Manager
*/
class WebSocketClientSelector extends SelectorManager
{
@Override
public boolean dispatch(Runnable task)
@ -255,7 +326,7 @@ public class WebSocketClient extends AggregateLifeCycle
@Override
protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)
{
WebSocketHolder holder = (WebSocketHolder) endpoint.getSelectionKey().attachment();
WebSocketFuture holder = (WebSocketFuture) endpoint.getSelectionKey().attachment();
return new HandshakeConnection(endpoint,holder);
}
@ -280,29 +351,34 @@ public class WebSocketClient extends AggregateLifeCycle
@Override
protected void connectionFailed(SocketChannel channel, Throwable ex, Object attachment)
{
if (!(attachment instanceof WebSocketHolder))
if (!(attachment instanceof WebSocketFuture))
super.connectionFailed(channel,ex,attachment);
else
{
__log.debug(ex);
WebSocketHolder holder = (WebSocketHolder)attachment;
WebSocketFuture holder = (WebSocketFuture)attachment;
holder.handshakeFailed(ex);
}
}
}
/* ------------------------------------------------------------ */
/** Handshake Connection.
* Handles the connection until the handshake succeeds or fails.
*/
class HandshakeConnection extends AbstractConnection
{
private final SelectChannelEndPoint _endp;
private final WebSocketHolder _holder;
private final WebSocketFuture _holder;
private final String _key;
private final HttpParser _parser;
private String _accept;
private String _error;
public HandshakeConnection(SelectChannelEndPoint endpoint, WebSocketHolder holder)
public HandshakeConnection(SelectChannelEndPoint endpoint, WebSocketFuture holder)
{
super(endpoint,System.currentTimeMillis());
_endp=endpoint;
@ -453,6 +529,10 @@ public class WebSocketClient extends AggregateLifeCycle
}
}
/* ------------------------------------------------------------ */
/** Exception recording a WebSocket handshake protocol exception.
*/
class ProtocolException extends IOException
{
ProtocolException(String reason)
@ -461,7 +541,11 @@ public class WebSocketClient extends AggregateLifeCycle
}
}
class WebSocketHolder implements Future<WebSocket.Connection>
/* ------------------------------------------------------------ */
/** The Future Websocket Connection.
*/
class WebSocketFuture implements Future<WebSocket.Connection>
{
final WebSocket _websocket;;
final URI _uri;
@ -469,22 +553,13 @@ public class WebSocketClient extends AggregateLifeCycle
final int _maxIdleTime;
final Map<String,String> _cookies;
final List<String> _extensions;
final CountDownLatch _latch = new CountDownLatch(1);
final CountDownLatch _done = new CountDownLatch(1);
ByteChannel _channel;
WebSocketConnection _connection;
Throwable _exception;
final Timeout.Task _timeout = new Timeout.Task()
{
@Override
public void expired()
{
handshakeFailed(new IOException("expired"));
}
};
public WebSocketHolder(WebSocket websocket, URI uri, String protocol, int maxIdleTime, Map<String,String> cookies,List<String> extensions, ByteChannel channel)
public WebSocketFuture(WebSocket websocket, URI uri, String protocol, int maxIdleTime, Map<String,String> cookies,List<String> extensions, ByteChannel channel)
{
_websocket=websocket;
_uri=uri;
@ -499,8 +574,6 @@ public class WebSocketClient extends AggregateLifeCycle
{
try
{
_timeout.cancel();
synchronized (this)
{
if (_channel!=null)
@ -518,7 +591,7 @@ public class WebSocketClient extends AggregateLifeCycle
}
finally
{
_latch.countDown();
_done.countDown();
}
}
@ -526,7 +599,6 @@ public class WebSocketClient extends AggregateLifeCycle
{
try
{
_timeout.cancel();
ByteChannel channel=null;
synchronized (this)
{
@ -548,7 +620,7 @@ public class WebSocketClient extends AggregateLifeCycle
}
finally
{
_latch.countDown();
_done.countDown();
}
}
@ -605,7 +677,7 @@ public class WebSocketClient extends AggregateLifeCycle
}
finally
{
_latch.countDown();
_done.countDown();
}
}
@ -640,7 +712,7 @@ public class WebSocketClient extends AggregateLifeCycle
public org.eclipse.jetty.websocket.WebSocket.Connection get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException,
TimeoutException
{
_latch.await(timeout,unit);
_done.await(timeout,unit);
ByteChannel channel=null;
org.eclipse.jetty.websocket.WebSocket.Connection connection=null;
Throwable exception=null;

View File

@ -33,10 +33,10 @@ import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
import org.eclipse.jetty.websocket.WebSocket.OnControl;
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
public class WebSocketConnectionD06 extends AbstractConnection implements WebSocketConnection
{

View File

@ -33,10 +33,10 @@ import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
import org.eclipse.jetty.websocket.WebSocket.OnControl;
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
import org.eclipse.jetty.websocket.WebSocketGeneratorD10.MaskGen;
public class WebSocketConnectionD10 extends AbstractConnection implements WebSocketConnection

View File

@ -20,7 +20,6 @@ import java.util.Random;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.TypeUtil;
/* ------------------------------------------------------------ */

View File

@ -14,13 +14,11 @@
package org.eclipse.jetty.websocket;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.Random;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.TypeUtil;
/* ------------------------------------------------------------ */

View File

@ -18,8 +18,6 @@ import java.io.IOException;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;

View File

@ -18,8 +18,6 @@ import java.io.IOException;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;

View File

@ -1,7 +1,5 @@
package org.eclipse.jetty.websocket;
import static org.hamcrest.CoreMatchers.*;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@ -10,18 +8,15 @@ import java.io.OutputStream;
import java.net.ConnectException;
import java.net.ServerSocket;
import java.net.Socket;
import java.net.SocketTimeoutException;
import java.net.URI;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.IO;

View File

@ -25,7 +25,6 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.AfterClass;
import org.junit.BeforeClass;

View File

@ -22,7 +22,6 @@ import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;

View File

@ -26,7 +26,6 @@ import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;