Merge branch 'master' into release

This commit is contained in:
Jesse McConnell 2011-08-31 13:55:18 -05:00
commit b6abe647aa
12 changed files with 697 additions and 147 deletions

4
.gitignore vendored
View File

@ -9,4 +9,6 @@ target/
*.diff *.diff
*.patch *.patch
*.iml *.iml
.idea *.ipr
*.iws
.idea/

View File

@ -547,7 +547,7 @@ public class HttpExchangeTest
httpExchange.setURL(_scheme+"://localhost:"+_port); httpExchange.setURL(_scheme+"://localhost:"+_port);
httpExchange.setRequestURI("*"); httpExchange.setRequestURI("*");
httpExchange.setMethod(HttpMethods.OPTIONS); httpExchange.setMethod(HttpMethods.OPTIONS);
httpExchange.setRequestHeader("Connection","close"); // httpExchange.setRequestHeader("Connection","close");
_httpClient.send(httpExchange); _httpClient.send(httpExchange);
int state = httpExchange.waitForDone(); int state = httpExchange.waitForDone();

View File

@ -26,7 +26,7 @@
<name>Jetty :: Monitoring</name> <name>Jetty :: Monitoring</name>
<description>Performance monitoring artifact for jetty.</description> <description>Performance monitoring artifact for jetty.</description>
<properties> <properties>
<bundle-symbolic-name>${project.groupId}.jmx</bundle-symbolic-name> <bundle-symbolic-name>${project.groupId}.monitor</bundle-symbolic-name>
</properties> </properties>
<build> <build>
<plugins> <plugins>
@ -83,39 +83,11 @@
<forkMode>always</forkMode> <forkMode>always</forkMode>
</configuration> </configuration>
</plugin> </plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>unpack-jetty-distro</id>
<phase>process-test-resources</phase>
<goals>
<goal>unpack</goal>
</goals>
<configuration>
<artifactItems>
<artifactItem>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-distribution</artifactId>
<version>${project.version}</version>
<type>zip</type>
<overWrite>true</overWrite>
</artifactItem>
</artifactItems>
<outputAbsoluteArtifactFilename>true</outputAbsoluteArtifactFilename>
<outputDirectory>${test-dist-dir}</outputDirectory>
<overWriteSnapshots>true</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
</configuration>
</execution>
</executions>
</plugin>
<plugin> <plugin>
<groupId>org.codehaus.mojo</groupId> <groupId>org.codehaus.mojo</groupId>
<artifactId>findbugs-maven-plugin</artifactId> <artifactId>findbugs-maven-plugin</artifactId>
<configuration> <configuration>
<onlyAnalyze>org.eclipse.jetty.jmx.*</onlyAnalyze> <onlyAnalyze>org.eclipse.jetty.monitor.*</onlyAnalyze>
</configuration> </configuration>
</plugin> </plugin>
</plugins> </plugins>

View File

@ -10,7 +10,8 @@
<properties> <properties>
<jetty-version>${project.version}</jetty-version> <jetty-version>${project.version}</jetty-version>
<junit4-version>${junit-version}</junit4-version> <junit4-version>${junit-version}</junit4-version>
<bundle-symbolic-name>${project.groupId}.mongodb</bundle-symbolic-name> </properties> <bundle-symbolic-name>${project.groupId}.nosql</bundle-symbolic-name>
</properties>
<build> <build>
<defaultGoal>install</defaultGoal> <defaultGoal>install</defaultGoal>
<plugins> <plugins>

View File

@ -112,6 +112,8 @@ public interface WebSocket
void disconnect(); void disconnect();
boolean isOpen(); boolean isOpen();
void setMaxIdleTime(int ms);
/** /**
* @param size size<0 No aggregation of frames to messages, >=0 max size of text frame aggregation buffer in characters * @param size size<0 No aggregation of frames to messages, >=0 max size of text frame aggregation buffer in characters
*/ */

View File

@ -413,6 +413,18 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
{ {
} }
public void setMaxIdleTime(int ms)
{
try
{
_endp.setMaxIdleTime(ms);
}
catch(IOException e)
{
LOG.warn(e);
}
}
public void setMaxBinaryMessageSize(int size) public void setMaxBinaryMessageSize(int size)
{ {
} }

View File

@ -387,6 +387,19 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
WebSocketConnectionD06.this.closeOut(code,message); WebSocketConnectionD06.this.closeOut(code,message);
} }
/* ------------------------------------------------------------ */
public void setMaxIdleTime(int ms)
{
try
{
_endp.setMaxIdleTime(ms);
}
catch(IOException e)
{
LOG.warn(e);
}
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void setMaxTextMessageSize(int size) public void setMaxTextMessageSize(int size)
{ {

View File

@ -277,7 +277,7 @@ public class WebSocketConnectionD12 extends AbstractConnection implements WebSoc
public void idleExpired() public void idleExpired()
{ {
long idle = System.currentTimeMillis()-((SelectChannelEndPoint)_endp).getIdleTimestamp(); long idle = System.currentTimeMillis()-((SelectChannelEndPoint)_endp).getIdleTimestamp();
closeOut(WebSocketConnectionD12.CLOSE_NORMAL,"Idle for "+idle+"ms"); closeOut(WebSocketConnectionD12.CLOSE_NORMAL,"Idle for "+idle+"ms > "+_endp.getMaxIdleTime()+"ms");
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -475,6 +475,19 @@ public class WebSocketConnectionD12 extends AbstractConnection implements WebSoc
WebSocketConnectionD12.this.closeOut(code,message); WebSocketConnectionD12.this.closeOut(code,message);
} }
/* ------------------------------------------------------------ */
public void setMaxIdleTime(int ms)
{
try
{
_endp.setMaxIdleTime(ms);
}
catch(IOException e)
{
LOG.warn(e);
}
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void setMaxTextMessageSize(int size) public void setMaxTextMessageSize(int size)
{ {
@ -770,6 +783,7 @@ public class WebSocketConnectionD12 extends AbstractConnection implements WebSoc
int max = _connection.getMaxBinaryMessageSize(); int max = _connection.getMaxBinaryMessageSize();
if (max>0 && (bufferLen+length)>max) if (max>0 && (bufferLen+length)>max)
{ {
LOG.warn("Binary message too large > {}B for {}",_connection.getMaxBinaryMessageSize(),_endp);
_connection.close(WebSocketConnectionD12.CLOSE_BADDATA,"Message size > "+_connection.getMaxBinaryMessageSize()); _connection.close(WebSocketConnectionD12.CLOSE_BADDATA,"Message size > "+_connection.getMaxBinaryMessageSize());
_opcode=-1; _opcode=-1;
if (_aggregate!=null) if (_aggregate!=null)
@ -781,6 +795,7 @@ public class WebSocketConnectionD12 extends AbstractConnection implements WebSoc
private void textMessageTooLarge() private void textMessageTooLarge()
{ {
LOG.warn("Text message too large > {} chars for {}",_connection.getMaxTextMessageSize(),_endp);
_connection.close(WebSocketConnectionD12.CLOSE_BADDATA,"Text message size > "+_connection.getMaxTextMessageSize()+" chars"); _connection.close(WebSocketConnectionD12.CLOSE_BADDATA,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
_opcode=-1; _opcode=-1;

View File

@ -11,18 +11,22 @@ import java.net.Socket;
import java.net.URI; import java.net.URI;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future; import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
public class WebSocketClientTest public class WebSocketClientTest
@ -49,6 +53,56 @@ public class WebSocketClientTest
_factory.stop(); _factory.stop();
} }
@Ignore
@Test
public void testMessageBiggerThanBufferSize() throws Exception
{
int bufferSize = 512;
WebSocketClientFactory factory = new WebSocketClientFactory(new QueuedThreadPool(), new ZeroMaskGen(), bufferSize);
factory.start();
WebSocketClient client = new WebSocketClient(factory);
final CountDownLatch openLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
WebSocket.OnTextMessage websocket = new WebSocket.OnTextMessage()
{
public void onOpen(Connection connection)
{
openLatch.countDown();
}
public void onMessage(String data)
{
System.out.println("data = " + data);
dataLatch.countDown();
}
public void onClose(int closeCode, String message)
{
}
};
Future<WebSocket.Connection> future = client.open(new URI("ws://127.0.0.1:" + _serverPort + "/"), websocket);
Socket socket = _server.accept();
accept(socket);
Assert.assertTrue(openLatch.await(1, TimeUnit.SECONDS));
OutputStream serverOutput = socket.getOutputStream();
int length = bufferSize + bufferSize / 2;
serverOutput.write(0x80 | 0x01); // FIN + TEXT
serverOutput.write(0x7E); // No MASK and 2 bytes length
serverOutput.write(length >> 8); // first length byte
serverOutput.write(length & 0xFF); // second length byte
for (int i = 0; i < length; ++i)
serverOutput.write('x');
serverOutput.flush();
Assert.assertTrue(dataLatch.await(1000, TimeUnit.SECONDS));
factory.stop();
}
@Test @Test
public void testBadURL() throws Exception public void testBadURL() throws Exception
{ {
@ -429,6 +483,216 @@ public class WebSocketClientTest
} }
@Test
public void testBlockSending() throws Exception
{
WebSocketClient client = new WebSocketClient(_factory);
client.setMaxIdleTime(10000);
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
final CountDownLatch _latch = new CountDownLatch(1);
Future<WebSocket.Connection> future=client.open(new URI("ws://127.0.0.1:"+_serverPort+"/"),new WebSocket.OnTextMessage()
{
public void onOpen(Connection connection)
{
open.set(true);
}
public void onClose(int closeCode, String message)
{
close.set(closeCode);
_latch.countDown();
}
public void onMessage(String data)
{
}
});
final Socket socket = _server.accept();
accept(socket);
WebSocket.Connection connection = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertNotNull(connection);
Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get());
final int messages=20000;
final AtomicLong totalB=new AtomicLong();
Thread consumer = new Thread()
{
public void run()
{
try
{
Thread.sleep(2000);
byte[] recv = new byte[32*1024];
int len=0;
while (len>=0)
{
totalB.addAndGet(len);
len=socket.getInputStream().read(recv,0,recv.length);
Thread.sleep(10);
}
}
catch(InterruptedException e)
{
return;
}
catch(Exception e)
{
e.printStackTrace();
}
}
};
consumer.start();
// Send lots of messages client to server
long max=0;
long start=System.currentTimeMillis();
String mesg="This is a test message to send";
for (int i=0;i<messages;i++)
{
connection.sendMessage("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXX");
if (i%100==0)
{
long now=System.currentTimeMillis();
long duration=now-start;
start=now;
if (duration>max)
max=duration;
}
}
// wait for consumer to complete
while (totalB.get()<messages*(mesg.length()+6L))
Thread.sleep(10);
Assert.assertTrue(max>1000); // writing was blocked
Assert.assertEquals(messages*(mesg.length()+6L),totalB.get());
consumer.interrupt();
}
@Test
public void testBlockReceiving() throws Exception
{
WebSocketClient client = new WebSocketClient(_factory);
client.setMaxIdleTime(60000);
final AtomicBoolean open = new AtomicBoolean();
final AtomicInteger close = new AtomicInteger();
final CountDownLatch _latch = new CountDownLatch(1);
final Exchanger<String> exchanger = new Exchanger<String>();
Future<WebSocket.Connection> future=client.open(new URI("ws://127.0.0.1:"+_serverPort+"/"),new WebSocket.OnTextMessage()
{
public void onOpen(Connection connection)
{
open.set(true);
}
public void onClose(int closeCode, String message)
{
//System.err.println("CLOSE "+closeCode+" "+message);
close.set(closeCode);
_latch.countDown();
}
public void onMessage(String data)
{
try
{
exchanger.exchange(data);
}
catch (InterruptedException e)
{
// e.printStackTrace();
}
}
});
Socket socket = _server.accept();
socket.setSoTimeout(60000);
accept(socket);
WebSocket.Connection connection = future.get(250,TimeUnit.MILLISECONDS);
Assert.assertNotNull(connection);
Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get());
// define some messages to send server to client
byte[] send = new byte[] { (byte)0x81, (byte) 0x05,
(byte)'H', (byte)'e', (byte)'l', (byte)'l',(byte)'o' };
final int messages=100000;
final AtomicInteger m = new AtomicInteger();
// Set up a consumer of received messages that waits a while before consuming
Thread consumer = new Thread()
{
public void run()
{
try
{
Thread.sleep(2000);
while(m.get()<messages)
{
String msg =exchanger.exchange(null);
if ("Hello".equals(msg))
m.incrementAndGet();
else
throw new IllegalStateException("exchanged "+msg);
}
}
catch(InterruptedException e)
{
return;
}
catch(Exception e)
{
e.printStackTrace();
}
}
};
consumer.start();
long max=0;
long start=System.currentTimeMillis();
for (int i=0;i<messages;i++)
{
socket.getOutputStream().write(send,0,send.length);
socket.getOutputStream().flush();
if (i%100==0)
{
long now=System.currentTimeMillis();
long duration=now-start;
start=now;
if (duration>max)
max=duration;
}
}
while(consumer.isAlive())
Thread.sleep(10);
Assert.assertTrue(max>1000); // writing was blocked
Assert.assertEquals(m.get(),messages);
// Close with code
start=System.currentTimeMillis();
socket.getOutputStream().write(new byte[]{(byte)0x88, (byte) 0x02, (byte)4, (byte)87 },0,4);
socket.getOutputStream().flush();
_latch.await(10,TimeUnit.SECONDS);
Assert.assertTrue(System.currentTimeMillis()-start<5000);
Assert.assertEquals(1111,close.get());
}
private void respondToClient(Socket connection, String serverResponse) throws IOException private void respondToClient(Socket connection, String serverResponse) throws IOException
{ {

View File

@ -11,6 +11,8 @@ import java.io.OutputStream;
import java.net.Socket; import java.net.Socket;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Deflater; import java.util.zip.Deflater;
import java.util.zip.Inflater; import java.util.zip.Inflater;
@ -35,39 +37,45 @@ import org.junit.Test;
*/ */
public class WebSocketMessageD12Test public class WebSocketMessageD12Test
{ {
private static Server _server; private static Server __server;
private static Connector _connector; private static Connector __connector;
private static TestWebSocket _serverWebSocket; private static TestWebSocket __serverWebSocket;
private static CountDownLatch __latch;
private static AtomicInteger __textCount = new AtomicInteger(0);
@BeforeClass @BeforeClass
public static void startServer() throws Exception public static void startServer() throws Exception
{ {
_server = new Server(); __server = new Server();
_connector = new SelectChannelConnector(); __connector = new SelectChannelConnector();
_server.addConnector(_connector); __server.addConnector(__connector);
WebSocketHandler wsHandler = new WebSocketHandler() WebSocketHandler wsHandler = new WebSocketHandler()
{ {
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{ {
_serverWebSocket = new TestWebSocket(); __textCount.set(0);
_serverWebSocket._onConnect=("onConnect".equals(protocol)); __serverWebSocket = new TestWebSocket();
_serverWebSocket._echo=("echo".equals(protocol)); __serverWebSocket._onConnect=("onConnect".equals(protocol));
_serverWebSocket._aggregate=("aggregate".equals(protocol)); __serverWebSocket._echo=("echo".equals(protocol));
return _serverWebSocket; __serverWebSocket._aggregate=("aggregate".equals(protocol));
__serverWebSocket._latch=("latch".equals(protocol));
if (__serverWebSocket._latch)
__latch=new CountDownLatch(1);
return __serverWebSocket;
} }
}; };
wsHandler.setBufferSize(8192); wsHandler.setBufferSize(8192);
wsHandler.setMaxIdleTime(1000); wsHandler.setMaxIdleTime(1000);
wsHandler.setHandler(new DefaultHandler()); wsHandler.setHandler(new DefaultHandler());
_server.setHandler(wsHandler); __server.setHandler(wsHandler);
_server.start(); __server.start();
} }
@AfterClass @AfterClass
public static void stopServer() throws Exception public static void stopServer() throws Exception
{ {
_server.stop(); __server.stop();
_server.join(); __server.join();
} }
@ -80,7 +88,7 @@ public class WebSocketMessageD12Test
@Test @Test
public void testServerSendBigStringMessage() throws Exception public void testServerSendBigStringMessage() throws Exception
{ {
Socket socket = new Socket("localhost", _connector.getLocalPort()); Socket socket = new Socket("localhost", __connector.getLocalPort());
OutputStream output = socket.getOutputStream(); OutputStream output = socket.getOutputStream();
output.write( output.write(
("GET /chat HTTP/1.1\r\n"+ ("GET /chat HTTP/1.1\r\n"+
@ -104,8 +112,8 @@ public class WebSocketMessageD12Test
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input); skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000)); assertTrue(__serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection); assertNotNull(__serverWebSocket.connection);
// Server sends a big message // Server sends a big message
StringBuilder message = new StringBuilder(); StringBuilder message = new StringBuilder();
@ -113,7 +121,7 @@ public class WebSocketMessageD12Test
for (int i = 0; i < (0x2000) / text.length(); i++) for (int i = 0; i < (0x2000) / text.length(); i++)
message.append(text); message.append(text);
String data=message.toString(); String data=message.toString();
_serverWebSocket.connection.sendMessage(data); __serverWebSocket.connection.sendMessage(data);
assertEquals(WebSocketConnectionD12.OP_TEXT,input.read()); assertEquals(WebSocketConnectionD12.OP_TEXT,input.read());
assertEquals(0x7e,input.read()); assertEquals(0x7e,input.read());
@ -128,7 +136,7 @@ public class WebSocketMessageD12Test
@Test @Test
public void testServerSendOnConnect() throws Exception public void testServerSendOnConnect() throws Exception
{ {
Socket socket = new Socket("localhost", _connector.getLocalPort()); Socket socket = new Socket("localhost", __connector.getLocalPort());
OutputStream output = socket.getOutputStream(); OutputStream output = socket.getOutputStream();
output.write( output.write(
("GET /chat HTTP/1.1\r\n"+ ("GET /chat HTTP/1.1\r\n"+
@ -152,8 +160,8 @@ public class WebSocketMessageD12Test
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input); skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000)); assertTrue(__serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection); assertNotNull(__serverWebSocket.connection);
assertEquals(0x81,input.read()); assertEquals(0x81,input.read());
assertEquals(0x0f,input.read()); assertEquals(0x0f,input.read());
@ -163,7 +171,7 @@ public class WebSocketMessageD12Test
@Test @Test
public void testIdentityExtension() throws Exception public void testIdentityExtension() throws Exception
{ {
Socket socket = new Socket("localhost", _connector.getLocalPort()); Socket socket = new Socket("localhost", __connector.getLocalPort());
OutputStream output = socket.getOutputStream(); OutputStream output = socket.getOutputStream();
output.write( output.write(
("GET /chat HTTP/1.1\r\n"+ ("GET /chat HTTP/1.1\r\n"+
@ -194,8 +202,8 @@ public class WebSocketMessageD12Test
lookFor("identity;",input); lookFor("identity;",input);
skipTo("\r\n\r\n",input); skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000)); assertTrue(__serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection); assertNotNull(__serverWebSocket.connection);
assertEquals(0x81,input.read()); assertEquals(0x81,input.read());
assertEquals(0x0f,input.read()); assertEquals(0x0f,input.read());
@ -206,7 +214,7 @@ public class WebSocketMessageD12Test
@Test @Test
public void testFragmentExtension() throws Exception public void testFragmentExtension() throws Exception
{ {
Socket socket = new Socket("localhost", _connector.getLocalPort()); Socket socket = new Socket("localhost", __connector.getLocalPort());
OutputStream output = socket.getOutputStream(); OutputStream output = socket.getOutputStream();
output.write( output.write(
("GET /chat HTTP/1.1\r\n"+ ("GET /chat HTTP/1.1\r\n"+
@ -232,8 +240,8 @@ public class WebSocketMessageD12Test
lookFor("fragment;",input); lookFor("fragment;",input);
skipTo("\r\n\r\n",input); skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000)); assertTrue(__serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection); assertNotNull(__serverWebSocket.connection);
assertEquals(0x01,input.read()); assertEquals(0x01,input.read());
assertEquals(0x04,input.read()); assertEquals(0x04,input.read());
@ -260,7 +268,7 @@ public class WebSocketMessageD12Test
@Test @Test
public void testDeflateFrameExtension() throws Exception public void testDeflateFrameExtension() throws Exception
{ {
Socket socket = new Socket("localhost", _connector.getLocalPort()); Socket socket = new Socket("localhost", __connector.getLocalPort());
OutputStream output = socket.getOutputStream(); OutputStream output = socket.getOutputStream();
output.write( output.write(
("GET /chat HTTP/1.1\r\n"+ ("GET /chat HTTP/1.1\r\n"+
@ -289,8 +297,8 @@ public class WebSocketMessageD12Test
lookFor("fragment;",input); lookFor("fragment;",input);
skipTo("\r\n\r\n",input); skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000)); assertTrue(__serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection); assertNotNull(__serverWebSocket.connection);
// Server sends a big message // Server sends a big message
@ -354,10 +362,11 @@ public class WebSocketMessageD12Test
} }
@Test @Test
public void testServerEcho() throws Exception public void testServerEcho() throws Exception
{ {
Socket socket = new Socket("localhost", _connector.getLocalPort()); Socket socket = new Socket("localhost", __connector.getLocalPort());
OutputStream output = socket.getOutputStream(); OutputStream output = socket.getOutputStream();
output.write( output.write(
("GET /chat HTTP/1.1\r\n"+ ("GET /chat HTTP/1.1\r\n"+
@ -390,18 +399,205 @@ public class WebSocketMessageD12Test
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input); skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000)); assertTrue(__serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection); assertNotNull(__serverWebSocket.connection);
assertEquals(0x84,input.read()); assertEquals(0x84,input.read());
assertEquals(0x0f,input.read()); assertEquals(0x0f,input.read());
lookFor("this is an echo",input); lookFor("this is an echo",input);
} }
@Test
public void testBlockedConsumer() throws Exception
{
Socket socket = new Socket("localhost", __connector.getLocalPort());
OutputStream output = socket.getOutputStream();
byte[] bytes="This is a long message of text that we will send again and again".getBytes(StringUtil.__ISO_8859_1);
byte[] mesg=new byte[bytes.length+6];
mesg[0]=(byte)(0x80+WebSocketConnectionD12.OP_TEXT);
mesg[1]=(byte)(0x80+bytes.length);
mesg[2]=(byte)0xff;
mesg[3]=(byte)0xff;
mesg[4]=(byte)0xff;
mesg[5]=(byte)0xff;
for (int i=0;i<bytes.length;i++)
mesg[6+i]=(byte)(bytes[i]^0xff);
final int count = 100000;
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: latch\r\n" +
"Sec-WebSocket-Version: 7\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
// Make sure the read times out if there are problems with the implementation
socket.setSoTimeout(60000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(__serverWebSocket.awaitConnected(1000));
assertNotNull(__serverWebSocket.connection);
__serverWebSocket.connection.setMaxIdleTime(60000);
// Send and receive 1 message
output.write(mesg);
output.flush();
while(__textCount.get()==0)
Thread.sleep(10);
// unblock the latch in 4s
new Thread()
{
public void run()
{
try
{
Thread.sleep(4000);
__latch.countDown();
//System.err.println("latched");
}
catch(Exception e)
{
e.printStackTrace();
}
}
}.start();
// Send enough messages to fill receive buffer
long max=0;
long start=System.currentTimeMillis();
for (int i=0;i<count;i++)
{
output.write(mesg);
if (i%100==0)
{
// System.err.println(">>> "+i);
output.flush();
long now=System.currentTimeMillis();
long duration=now-start;
start=now;
if (max<duration)
max=duration;
}
}
Thread.sleep(50);
while(__textCount.get()<count+1)
{
System.err.println(__textCount.get()+"<"+(count+1));
Thread.sleep(10);
}
assertEquals(count+1,__textCount.get()); // all messages
assertTrue(max>2000); // was blocked
}
@Test
public void testBlockedProducer() throws Exception
{
final Socket socket = new Socket("localhost", __connector.getLocalPort());
OutputStream output = socket.getOutputStream();
final int count = 100000;
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: latch\r\n" +
"Sec-WebSocket-Version: 7\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
// Make sure the read times out if there are problems with the implementation
socket.setSoTimeout(60000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(__serverWebSocket.awaitConnected(1000));
assertNotNull(__serverWebSocket.connection);
__serverWebSocket.connection.setMaxIdleTime(60000);
__latch.countDown();
// wait 2s and then consume messages
final AtomicLong totalB=new AtomicLong();
new Thread()
{
public void run()
{
try
{
Thread.sleep(2000);
byte[] recv = new byte[32*1024];
int len=0;
while (len>=0)
{
totalB.addAndGet(len);
len=socket.getInputStream().read(recv,0,recv.length);
Thread.sleep(10);
}
}
catch(Exception e)
{
e.printStackTrace();
}
}
}.start();
// Send enough messages to fill receive buffer
long max=0;
long start=System.currentTimeMillis();
String mesg="How Now Brown Cow";
for (int i=0;i<count;i++)
{
__serverWebSocket.connection.sendMessage(mesg);
if (i%100==0)
{
output.flush();
long now=System.currentTimeMillis();
long duration=now-start;
start=now;
if (max<duration)
max=duration;
}
}
while(totalB.get()<(count*(mesg.length()+2)))
Thread.sleep(100);
assertEquals(count*(mesg.length()+2),totalB.get()); // all messages
assertTrue(max>1000); // was blocked
}
@Test @Test
public void testServerPingPong() throws Exception public void testServerPingPong() throws Exception
{ {
Socket socket = new Socket("localhost", _connector.getLocalPort()); Socket socket = new Socket("localhost", __connector.getLocalPort());
// Make sure the read times out if there are problems with the implementation // Make sure the read times out if there are problems with the implementation
OutputStream output = socket.getOutputStream(); OutputStream output = socket.getOutputStream();
output.write( output.write(
@ -430,8 +626,8 @@ public class WebSocketMessageD12Test
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input); skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000)); assertTrue(__serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection); assertNotNull(__serverWebSocket.connection);
socket.setSoTimeout(1000); socket.setSoTimeout(1000);
assertEquals(0x8A,input.read()); assertEquals(0x8A,input.read());
@ -439,9 +635,9 @@ public class WebSocketMessageD12Test
} }
@Test @Test
public void testMaxTextSize() throws Exception public void testMaxTextSizeFalseFrag() throws Exception
{ {
Socket socket = new Socket("localhost", _connector.getLocalPort()); Socket socket = new Socket("localhost", __connector.getLocalPort());
OutputStream output = socket.getOutputStream(); OutputStream output = socket.getOutputStream();
output.write( output.write(
("GET /chat HTTP/1.1\r\n"+ ("GET /chat HTTP/1.1\r\n"+
@ -463,10 +659,62 @@ public class WebSocketMessageD12Test
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input); skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000)); assertTrue(__serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection); assertNotNull(__serverWebSocket.connection);
_serverWebSocket.getConnection().setMaxTextMessageSize(15); __serverWebSocket.getConnection().setMaxTextMessageSize(10*1024);
__serverWebSocket.getConnection().setFakeFragments(true);
output.write(0x81);
output.write(0x80|0x7E);
output.write((byte)((16*1024)>>8));
output.write((byte)((16*1024)&0xff));
output.write(0x00);
output.write(0x00);
output.write(0x00);
output.write(0x00);
for (int i=0;i<(16*1024);i++)
output.write('X');
output.flush();
assertEquals(0x80|WebSocketConnectionD12.OP_CLOSE,input.read());
assertEquals(33,input.read());
int code=(0xff&input.read())*0x100+(0xff&input.read());
assertEquals(WebSocketConnectionD12.CLOSE_BADDATA,code);
lookFor("Text message size > 10240 chars",input);
}
@Test
public void testMaxTextSize() throws Exception
{
Socket socket = new Socket("localhost", __connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: other\r\n" +
"Sec-WebSocket-Version: 7\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
socket.setSoTimeout(1000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(__serverWebSocket.awaitConnected(1000));
assertNotNull(__serverWebSocket.connection);
__serverWebSocket.getConnection().setMaxTextMessageSize(15);
output.write(0x01); output.write(0x01);
output.write(0x8a); output.write(0x8a);
@ -500,7 +748,7 @@ public class WebSocketMessageD12Test
@Test @Test
public void testMaxTextSize2() throws Exception public void testMaxTextSize2() throws Exception
{ {
Socket socket = new Socket("localhost", _connector.getLocalPort()); Socket socket = new Socket("localhost", __connector.getLocalPort());
OutputStream output = socket.getOutputStream(); OutputStream output = socket.getOutputStream();
output.write( output.write(
("GET /chat HTTP/1.1\r\n"+ ("GET /chat HTTP/1.1\r\n"+
@ -522,10 +770,10 @@ public class WebSocketMessageD12Test
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input); skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000)); assertTrue(__serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection); assertNotNull(__serverWebSocket.connection);
_serverWebSocket.getConnection().setMaxTextMessageSize(15); __serverWebSocket.getConnection().setMaxTextMessageSize(15);
output.write(0x01); output.write(0x01);
output.write(0x94); output.write(0x94);
@ -550,7 +798,7 @@ public class WebSocketMessageD12Test
@Test @Test
public void testBinaryAggregate() throws Exception public void testBinaryAggregate() throws Exception
{ {
Socket socket = new Socket("localhost", _connector.getLocalPort()); Socket socket = new Socket("localhost", __connector.getLocalPort());
OutputStream output = socket.getOutputStream(); OutputStream output = socket.getOutputStream();
output.write( output.write(
("GET /chat HTTP/1.1\r\n"+ ("GET /chat HTTP/1.1\r\n"+
@ -572,9 +820,9 @@ public class WebSocketMessageD12Test
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input); skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000)); assertTrue(__serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection); assertNotNull(__serverWebSocket.connection);
_serverWebSocket.getConnection().setMaxBinaryMessageSize(1024); __serverWebSocket.getConnection().setMaxBinaryMessageSize(1024);
output.write(WebSocketConnectionD12.OP_BINARY); output.write(WebSocketConnectionD12.OP_BINARY);
output.write(0x8a); output.write(0x8a);
@ -605,7 +853,7 @@ public class WebSocketMessageD12Test
@Test @Test
public void testMaxBinarySize() throws Exception public void testMaxBinarySize() throws Exception
{ {
Socket socket = new Socket("localhost", _connector.getLocalPort()); Socket socket = new Socket("localhost", __connector.getLocalPort());
OutputStream output = socket.getOutputStream(); OutputStream output = socket.getOutputStream();
output.write( output.write(
("GET /chat HTTP/1.1\r\n"+ ("GET /chat HTTP/1.1\r\n"+
@ -627,10 +875,10 @@ public class WebSocketMessageD12Test
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input); skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000)); assertTrue(__serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection); assertNotNull(__serverWebSocket.connection);
_serverWebSocket.getConnection().setMaxBinaryMessageSize(15); __serverWebSocket.getConnection().setMaxBinaryMessageSize(15);
output.write(0x02); output.write(0x02);
output.write(0x8a); output.write(0x8a);
@ -665,7 +913,7 @@ public class WebSocketMessageD12Test
@Test @Test
public void testMaxBinarySize2() throws Exception public void testMaxBinarySize2() throws Exception
{ {
Socket socket = new Socket("localhost", _connector.getLocalPort()); Socket socket = new Socket("localhost", __connector.getLocalPort());
OutputStream output = socket.getOutputStream(); OutputStream output = socket.getOutputStream();
output.write( output.write(
("GET /chat HTTP/1.1\r\n"+ ("GET /chat HTTP/1.1\r\n"+
@ -687,10 +935,10 @@ public class WebSocketMessageD12Test
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input); skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000)); assertTrue(__serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection); assertNotNull(__serverWebSocket.connection);
_serverWebSocket.getConnection().setMaxBinaryMessageSize(15); __serverWebSocket.getConnection().setMaxBinaryMessageSize(15);
output.write(0x02); output.write(0x02);
output.write(0x94); output.write(0x94);
@ -713,7 +961,7 @@ public class WebSocketMessageD12Test
@Test @Test
public void testIdle() throws Exception public void testIdle() throws Exception
{ {
Socket socket = new Socket("localhost", _connector.getLocalPort()); Socket socket = new Socket("localhost", __connector.getLocalPort());
OutputStream output = socket.getOutputStream(); OutputStream output = socket.getOutputStream();
output.write( output.write(
("GET /chat HTTP/1.1\r\n"+ ("GET /chat HTTP/1.1\r\n"+
@ -737,15 +985,15 @@ public class WebSocketMessageD12Test
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input); skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000)); assertTrue(__serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection); assertNotNull(__serverWebSocket.connection);
assertEquals(0x81,input.read()); assertEquals(0x81,input.read());
assertEquals(0x0f,input.read()); assertEquals(0x0f,input.read());
lookFor("sent on connect",input); lookFor("sent on connect",input);
assertEquals((byte)0x88,(byte)input.read()); assertEquals((byte)0x88,(byte)input.read());
assertEquals(17,input.read()); assertEquals(26,input.read());
assertEquals(1000/0x100,input.read()); assertEquals(1000/0x100,input.read());
assertEquals(1000%0x100,input.read()); assertEquals(1000%0x100,input.read());
lookFor("Idle",input); lookFor("Idle",input);
@ -760,10 +1008,10 @@ public class WebSocketMessageD12Test
output.flush(); output.flush();
assertTrue(_serverWebSocket.awaitDisconnected(5000)); assertTrue(__serverWebSocket.awaitDisconnected(5000));
try try
{ {
_serverWebSocket.connection.sendMessage("Don't send"); __serverWebSocket.connection.sendMessage("Don't send");
assertTrue(false); assertTrue(false);
} }
catch(IOException e) catch(IOException e)
@ -775,7 +1023,7 @@ public class WebSocketMessageD12Test
@Test @Test
public void testClose() throws Exception public void testClose() throws Exception
{ {
Socket socket = new Socket("localhost", _connector.getLocalPort()); Socket socket = new Socket("localhost", __connector.getLocalPort());
OutputStream output = socket.getOutputStream(); OutputStream output = socket.getOutputStream();
output.write( output.write(
("GET /chat HTTP/1.1\r\n"+ ("GET /chat HTTP/1.1\r\n"+
@ -800,20 +1048,20 @@ public class WebSocketMessageD12Test
skipTo("\r\n\r\n",input); skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000)); assertTrue(__serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection); assertNotNull(__serverWebSocket.connection);
assertEquals(0x81,input.read()); assertEquals(0x81,input.read());
assertEquals(0x0f,input.read()); assertEquals(0x0f,input.read());
lookFor("sent on connect",input); lookFor("sent on connect",input);
socket.close(); socket.close();
assertTrue(_serverWebSocket.awaitDisconnected(500)); assertTrue(__serverWebSocket.awaitDisconnected(500));
try try
{ {
_serverWebSocket.connection.sendMessage("Don't send"); __serverWebSocket.connection.sendMessage("Don't send");
assertTrue(false); assertTrue(false);
} }
catch(IOException e) catch(IOException e)
@ -938,6 +1186,7 @@ public class WebSocketMessageD12Test
private static class TestWebSocket implements WebSocket.OnFrame, WebSocket.OnBinaryMessage, WebSocket.OnTextMessage private static class TestWebSocket implements WebSocket.OnFrame, WebSocket.OnBinaryMessage, WebSocket.OnTextMessage
{ {
protected boolean _latch;
boolean _onConnect=false; boolean _onConnect=false;
boolean _echo=true; boolean _echo=true;
boolean _aggregate=false; boolean _aggregate=false;
@ -945,7 +1194,7 @@ public class WebSocketMessageD12Test
private final CountDownLatch disconnected = new CountDownLatch(1); private final CountDownLatch disconnected = new CountDownLatch(1);
private volatile FrameConnection connection; private volatile FrameConnection connection;
public Connection getConnection() public FrameConnection getConnection()
{ {
return connection; return connection;
} }
@ -1028,6 +1277,19 @@ public class WebSocketMessageD12Test
public void onMessage(String data) public void onMessage(String data)
{ {
__textCount.incrementAndGet();
if (_latch)
{
try
{
__latch.await();
}
catch(Exception e)
{
e.printStackTrace();
}
}
if (_aggregate) if (_aggregate)
{ {
try try

View File

@ -307,13 +307,20 @@ public class WebSocketParserD12Test
_in.putUnmasked((byte)(2048&0xff)); _in.putUnmasked((byte)(2048&0xff));
_in.sendMask(); _in.sendMask();
for (int i=0;i<2048;i++) for (int i=0;i<2048;i++)
_in.put((byte)'a'); _in.put((byte)('a'+i%26));
int progress =_parser.parseNext(); int progress =_parser.parseNext();
assertTrue(progress>0); assertTrue(progress>0);
assertEquals(2,_handler._frames); assertEquals(2,_handler._frames);
assertEquals(WebSocketConnectionD12.OP_CONTINUATION,_handler._opcode); assertEquals(WebSocketConnectionD12.OP_CONTINUATION,_handler._opcode);
assertEquals(1,_handler._data.size());
String mesg=_handler._data.remove(0);
assertEquals(2048,mesg.length());
for (int i=0;i<2048;i++)
assertEquals(('a'+i%26),mesg.charAt(i));
} }
private class Handler implements WebSocketParser.FrameHandler private class Handler implements WebSocketParser.FrameHandler

View File

@ -350,6 +350,7 @@
<module>jetty-nested</module> <module>jetty-nested</module>
<module>jetty-overlay-deployer</module> <module>jetty-overlay-deployer</module>
<module>jetty-nosql</module> <module>jetty-nosql</module>
<module>jetty-distribution</module>
<module>test-continuation</module> <module>test-continuation</module>
<module>test-continuation-jetty6</module> <module>test-continuation-jetty6</module>
<module>test-jetty-servlet</module> <module>test-jetty-servlet</module>
@ -437,7 +438,6 @@
</activation> </activation>
<modules> <modules>
<module>jetty-aggregate</module> <module>jetty-aggregate</module>
<module>jetty-distribution</module>
</modules> </modules>
</profile> </profile>
<profile> <profile>