Merge branch 'master' into jetty-8

This commit is contained in:
Jesse McConnell 2011-09-01 11:24:21 -05:00
commit 402352962d
11 changed files with 245 additions and 96 deletions

View File

@ -530,7 +530,7 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
// Job loop
while (job!=null && isRunning())
{
job.run();
runJob(job);
job=_jobs.poll();
}
@ -586,6 +586,27 @@ public class QueuedThreadPool extends AbstractLifeCycle implements SizedThreadPo
}
};
/* ------------------------------------------------------------ */
/**
* <p>Runs the given job in the {@link Thread#currentThread() current thread}.</p>
* <p>Subclasses may override to perform pre/post actions before/after the job is run.</p>
*
* @param job the job to run
*/
protected void runJob(Runnable job)
{
job.run();
}
/* ------------------------------------------------------------ */
/**
* @return the job queue
*/
protected BlockingQueue<Runnable> getQueue()
{
return _jobs;
}
/* ------------------------------------------------------------ */
/**
* @param id The thread ID to stop.

View File

@ -112,7 +112,6 @@ public interface WebSocket
void disconnect();
boolean isOpen();
/* ------------------------------------------------------------ */
/**
* @param ms The time in ms that the connection can be idle before closing
*/
@ -128,7 +127,6 @@ public interface WebSocket
*/
void setMaxBinaryMessageSize(int size);
/* ------------------------------------------------------------ */
/**
* @return The time in ms that the connection can be idle before closing
*/
@ -150,30 +148,113 @@ public interface WebSocket
/**
* Frame Level Connection
* <p>The Connection interface at the level of sending/receiving frames rather than messages.
* Also contains methods to decode/generate flags and opcodes without using constants, so that
* code can be written to work with multiple drafts of the protocol.
*
*/
public interface FrameConnection extends Connection
{
boolean isMessageComplete(byte flags);
/** Close the connection with specific closeCode and message.
* @param closeCode
* @param message
*/
void close(int closeCode,String message);
byte binaryOpcode();
byte textOpcode();
byte continuationOpcode();
byte finMask();
String getProtocol();
void setFakeFragments(boolean fake);
boolean isFakeFragments();
/**
* @return The opcode of a binary message
*/
byte binaryOpcode();
/**
* @return The opcode of a text message
*/
byte textOpcode();
/**
* @return The opcode of a continuation frame
*/
byte continuationOpcode();
/**
* @return Mask for the FIN bit.
*/
byte finMask();
/** Set if frames larger than the frame buffer are handled with local fragmentations
* @param allowFragmentation
*/
void setAllowFrameFragmentation(boolean allowFragmentation);
/**
* @param flags The flags bytes of a frame
* @return True of the flags indicate a final frame.
*/
boolean isMessageComplete(byte flags);
/**
* @param opcode
* @return True if the opcode is for a control frame
*/
boolean isControl(byte opcode);
/**
* @param opcode
* @return True if the opcode is for a text frame
*/
boolean isText(byte opcode);
/**
* @param opcode
* @return True if the opcode is for a binary frame
*/
boolean isBinary(byte opcode);
/**
* @param opcode
* @return True if the opcode is for a continuation frame
*/
boolean isContinuation(byte opcode);
/**
* @param opcode
* @return True if the opcode is a close control
*/
boolean isClose(byte opcode);
/**
* @param opcode
* @return True if the opcode is a ping control
*/
boolean isPing(byte opcode);
/**
* @param opcode
* @return True if the opcode is a pong control
*/
boolean isPong(byte opcode);
/**
* @return True if frames larger than the frame buffer are fragmented.
*/
boolean isAllowFrameFragmentation();
/** Send a control frame
* @param control
* @param data
* @param offset
* @param length
* @throws IOException
*/
void sendControl(byte control,byte[] data, int offset, int length) throws IOException;
/** Send an arbitrary frame
* @param flags
* @param opcode
* @param data
* @param offset
* @param length
* @throws IOException
*/
void sendFrame(byte flags,byte opcode,byte[] data, int offset, int length) throws IOException;
}
}

View File

@ -66,6 +66,8 @@ public class WebSocketClient
private String _origin;
private String _protocol;
private int _maxIdleTime=-1;
private int _maxTextMessageSize=16*1024;
private int _maxBinaryMessageSize=-1;
private MaskGen _maskGen;
private SocketAddress _bindAddress;
@ -227,6 +229,46 @@ public class WebSocketClient
_maskGen = maskGen;
}
/* ------------------------------------------------------------ */
/**
* @return The initial maximum text message size (in characters) for a connection
*/
public int getMaxTextMessageSize()
{
return _maxTextMessageSize;
}
/* ------------------------------------------------------------ */
/**
* Set the initial maximum text message size for a connection. This can be changed by
* the application calling {@link WebSocket.Connection#setMaxTextMessageSize(int)}.
* @param maxTextMessageSize The default maximum text message size (in characters) for a connection
*/
public void setMaxTextMessageSize(int maxTextMessageSize)
{
_maxTextMessageSize = maxTextMessageSize;
}
/* ------------------------------------------------------------ */
/**
* @return The initial maximum binary message size (in bytes) for a connection
*/
public int getMaxBinaryMessageSize()
{
return _maxBinaryMessageSize;
}
/* ------------------------------------------------------------ */
/**
* Set the initial maximum binary message size for a connection. This can be changed by
* the application calling {@link WebSocket.Connection#setMaxBinaryMessageSize(int)}.
* @param maxTextMessageSize The default maximum binary message size (in bytes) for a connection
*/
public void setMaxBinaryMessageSize(int maxBinaryMessageSize)
{
_maxBinaryMessageSize = maxBinaryMessageSize;
}
/* ------------------------------------------------------------ */
/**
* <p>Opens a websocket connection to the URI and blocks until the connection is accepted or there is an error.</p>
@ -285,11 +327,10 @@ public class WebSocketClient
if (_bindAddress != null)
channel.socket().bind(_bindAddress);
channel.socket().setTcpNoDelay(true);
int maxIdleTime = getMaxIdleTime();
InetSocketAddress address=new InetSocketAddress(uri.getHost(),uri.getPort());
final WebSocketFuture holder=new WebSocketFuture(websocket,uri,_protocol,_origin,_maskGen,maxIdleTime,_cookies,_extensions,channel);
final WebSocketFuture holder=new WebSocketFuture(websocket,uri,this,channel);
channel.configureBlocking(false);
channel.connect(address);
@ -309,6 +350,8 @@ public class WebSocketClient
final String _origin;
final MaskGen _maskGen;
final int _maxIdleTime;
final int _maxTextMessageSize;
final int _maxBinaryMessageSize;
final Map<String,String> _cookies;
final List<String> _extensions;
final CountDownLatch _done = new CountDownLatch(1);
@ -317,16 +360,18 @@ public class WebSocketClient
WebSocketConnection _connection;
Throwable _exception;
private WebSocketFuture(WebSocket websocket, URI uri, String protocol, String origin, MaskGen maskGen, int maxIdleTime, Map<String,String> cookies,List<String> extensions, ByteChannel channel)
private WebSocketFuture(WebSocket websocket, URI uri, WebSocketClient client, ByteChannel channel)
{
_websocket=websocket;
_uri=uri;
_protocol=protocol;
_origin=origin;
_maskGen=maskGen;
_maxIdleTime=maxIdleTime;
_cookies=cookies;
_extensions=extensions;
_protocol=client._protocol;
_origin=client._origin;
_maskGen=client._maskGen;
_maxIdleTime=client._maxIdleTime;
_maxTextMessageSize=client._maxTextMessageSize;
_maxBinaryMessageSize=client._maxBinaryMessageSize;
_cookies=client._cookies;
_extensions=client._extensions;
_channel=channel;
}
@ -334,6 +379,9 @@ public class WebSocketClient
{
try
{
connection.getConnection().setMaxTextMessageSize(_maxTextMessageSize);
connection.getConnection().setMaxBinaryMessageSize(_maxBinaryMessageSize);
synchronized (this)
{
if (_channel!=null)

View File

@ -258,7 +258,7 @@ public class WebSocketClientFactory extends AggregateLifeCycle
class HandshakeConnection extends AbstractConnection
{
private final SelectChannelEndPoint _endp;
private final WebSocketClient.WebSocketFuture _holder;
private final WebSocketClient.WebSocketFuture _future;
private final String _key;
private final HttpParser _parser;
private String _accept;
@ -268,7 +268,7 @@ public class WebSocketClientFactory extends AggregateLifeCycle
{
super(endpoint,System.currentTimeMillis());
_endp=endpoint;
_holder=future;
_future=future;
byte[] bytes=new byte[16];
__random.nextBytes(bytes);
@ -314,7 +314,7 @@ public class WebSocketClientFactory extends AggregateLifeCycle
}
});
String path=_holder.getURI().getPath();
String path=_future.getURI().getPath();
if (path==null || path.length()==0)
path="/";
@ -322,7 +322,7 @@ public class WebSocketClientFactory extends AggregateLifeCycle
String request=
"GET "+path+" HTTP/1.1\r\n"+
"Host: "+future.getURI().getHost()+":"+_holder.getURI().getPort()+"\r\n"+
"Host: "+future.getURI().getHost()+":"+_future.getURI().getPort()+"\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: "+_key+"\r\n"+
@ -366,7 +366,7 @@ public class WebSocketClientFactory extends AggregateLifeCycle
switch (_parser.parseAvailable())
{
case -1:
_holder.handshakeFailed(new IOException("Incomplete handshake response"));
_future.handshakeFailed(new IOException("Incomplete handshake response"));
return this;
case 0:
return this;
@ -383,14 +383,14 @@ public class WebSocketClientFactory extends AggregateLifeCycle
else
{
Buffer header=_parser.getHeaderBuffer();
MaskGen maskGen=_holder.getMaskGen();
WebSocketConnectionD13 connection = new WebSocketConnectionD13(_holder.getWebSocket(),_endp,_buffers,System.currentTimeMillis(),_holder.getMaxIdleTime(),_holder.getProtocol(),null,10,maskGen);
MaskGen maskGen=_future.getMaskGen();
WebSocketConnectionD13 connection = new WebSocketConnectionD13(_future.getWebSocket(),_endp,_buffers,System.currentTimeMillis(),_future.getMaxIdleTime(),_future.getProtocol(),null,10,maskGen);
if (header.hasContent())
connection.fillBuffersFrom(header);
_buffers.returnBuffer(header);
_holder.onConnection(connection);
_future.onConnection(connection);
return connection;
}
@ -413,9 +413,9 @@ public class WebSocketClientFactory extends AggregateLifeCycle
public void closed()
{
if (_error!=null)
_holder.handshakeFailed(new ProtocolException(_error));
_future.handshakeFailed(new ProtocolException(_error));
else
_holder.handshakeFailed(new EOFException());
_future.handshakeFailed(new EOFException());
}
}
}

View File

@ -557,11 +557,11 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
return 0;
}
public void setFakeFragments(boolean fake)
public void setAllowFrameFragmentation(boolean allowFragmentation)
{
}
public boolean isFakeFragments()
public boolean isAllowFrameFragmentation()
{
return false;
}

View File

@ -514,11 +514,11 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
}
public void setFakeFragments(boolean fake)
public void setAllowFrameFragmentation(boolean allowFragmentation)
{
}
public boolean isFakeFragments()
public boolean isAllowFrameFragmentation()
{
return false;
}

View File

@ -588,13 +588,13 @@ public class WebSocketConnectionD12 extends AbstractConnection implements WebSoc
}
/* ------------------------------------------------------------ */
public void setFakeFragments(boolean fake)
public void setAllowFrameFragmentation(boolean allowFragmentation)
{
_parser.setFakeFragments(fake);
_parser.setFakeFragments(allowFragmentation);
}
/* ------------------------------------------------------------ */
public boolean isFakeFragments()
public boolean isAllowFrameFragmentation()
{
return _parser.isFakeFragments();
}
@ -727,7 +727,7 @@ public class WebSocketConnectionD12 extends AbstractConnection implements WebSoc
_onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
else
{
LOG.warn("Frame discarded. Text aggregation disabed for {}",_endp);
LOG.warn("Frame discarded. Text aggregation disabled for {}",_endp);
_connection.close(WebSocketConnectionD12.CLOSE_BADDATA,"Text frame aggregation disabled");
}
}

View File

@ -592,13 +592,13 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
}
/* ------------------------------------------------------------ */
public void setFakeFragments(boolean fake)
public void setAllowFrameFragmentation(boolean allowFragmentation)
{
_parser.setFakeFragments(fake);
_parser.setFakeFragments(allowFragmentation);
}
/* ------------------------------------------------------------ */
public boolean isFakeFragments()
public boolean isAllowFrameFragmentation()
{
return _parser.isFakeFragments();
}
@ -616,7 +616,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
/* ------------------------------------------------------------ */
private class WSFrameHandler implements WebSocketParser.FrameHandler
{
private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
private final Utf8StringBuilder _utf8 = new Utf8StringBuilder(512); // TODO configure initial capacity
private ByteArrayBuffer _aggregate;
private byte _opcode=-1;
@ -731,7 +731,7 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
_onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
else
{
LOG.warn("Frame discarded. Text aggregation disabed for {}",_endp);
LOG.warn("Frame discarded. Text aggregation disabled for {}",_endp);
_connection.close(WebSocketConnectionD13.CLOSE_POLICY_VIOLATION,"Text frame aggregation disabled");
}
}

View File

@ -53,7 +53,6 @@ public class WebSocketClientTest
_factory.stop();
}
@Ignore
@Test
public void testMessageBiggerThanBufferSize() throws Exception
{
@ -73,7 +72,7 @@ public class WebSocketClientTest
public void onMessage(String data)
{
System.out.println("data = " + data);
// System.out.println("data = " + data);
dataLatch.countDown();
}

View File

@ -663,7 +663,7 @@ public class WebSocketMessageD12Test
assertNotNull(__serverWebSocket.connection);
__serverWebSocket.getConnection().setMaxTextMessageSize(10*1024);
__serverWebSocket.getConnection().setFakeFragments(true);
__serverWebSocket.getConnection().setAllowFrameFragmentation(true);
output.write(0x81);
output.write(0x80|0x7E);

View File

@ -663,7 +663,7 @@ public class WebSocketMessageD13Test
assertNotNull(__serverWebSocket.connection);
__serverWebSocket.getConnection().setMaxTextMessageSize(10*1024);
__serverWebSocket.getConnection().setFakeFragments(true);
__serverWebSocket.getConnection().setAllowFrameFragmentation(true);
output.write(0x81);
output.write(0x80|0x7E);