Added test for the lost messages bug.

This commit is contained in:
Simone Bordet 2011-08-31 18:29:09 +02:00
parent ba87334389
commit 0f8939dd96
1 changed files with 105 additions and 54 deletions

View File

@ -9,7 +9,6 @@ import java.net.ConnectException;
import java.net.ServerSocket; import java.net.ServerSocket;
import java.net.Socket; import java.net.Socket;
import java.net.URI; import java.net.URI;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Exchanger; import java.util.concurrent.Exchanger;
@ -23,9 +22,11 @@ import java.util.concurrent.atomic.AtomicLong;
import org.eclipse.jetty.util.BlockingArrayQueue; import org.eclipse.jetty.util.BlockingArrayQueue;
import org.eclipse.jetty.util.IO; import org.eclipse.jetty.util.IO;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.After; import org.junit.After;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Before; import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
public class WebSocketClientTest public class WebSocketClientTest
@ -33,25 +34,75 @@ public class WebSocketClientTest
private WebSocketClientFactory _factory = new WebSocketClientFactory(); private WebSocketClientFactory _factory = new WebSocketClientFactory();
private ServerSocket _server; private ServerSocket _server;
private int _serverPort; private int _serverPort;
@Before @Before
public void startServer() throws Exception public void startServer() throws Exception
{ {
_server = new ServerSocket(); _server = new ServerSocket();
_server.bind(null); _server.bind(null);
_serverPort = _server.getLocalPort(); _serverPort = _server.getLocalPort();
_factory.start(); _factory.start();
} }
@After @After
public void stopServer() throws Exception public void stopServer() throws Exception
{ {
if(_server != null) { if(_server != null) {
_server.close(); _server.close();
} }
_factory.stop(); _factory.stop();
} }
@Ignore
@Test
public void testMessageBiggerThanBufferSize() throws Exception
{
int bufferSize = 512;
WebSocketClientFactory factory = new WebSocketClientFactory(new QueuedThreadPool(), new ZeroMaskGen(), bufferSize);
factory.start();
WebSocketClient client = new WebSocketClient(factory);
final CountDownLatch openLatch = new CountDownLatch(1);
final CountDownLatch dataLatch = new CountDownLatch(1);
WebSocket.OnTextMessage websocket = new WebSocket.OnTextMessage()
{
public void onOpen(Connection connection)
{
openLatch.countDown();
}
public void onMessage(String data)
{
System.out.println("data = " + data);
dataLatch.countDown();
}
public void onClose(int closeCode, String message)
{
}
};
Future<WebSocket.Connection> future = client.open(new URI("ws://127.0.0.1:" + _serverPort + "/"), websocket);
Socket socket = _server.accept();
accept(socket);
Assert.assertTrue(openLatch.await(1, TimeUnit.SECONDS));
OutputStream serverOutput = socket.getOutputStream();
int length = bufferSize + bufferSize / 2;
serverOutput.write(0x80 | 0x01); // FIN + TEXT
serverOutput.write(0x7E); // No MASK and 2 bytes length
serverOutput.write(length >> 8); // first length byte
serverOutput.write(length & 0xFF); // second length byte
for (int i = 0; i < length; ++i)
serverOutput.write('x');
serverOutput.flush();
Assert.assertTrue(dataLatch.await(1000, TimeUnit.SECONDS));
factory.stop();
}
@Test @Test
public void testBadURL() throws Exception public void testBadURL() throws Exception
{ {
@ -67,11 +118,11 @@ public class WebSocketClientTest
{ {
open.set(true); open.set(true);
} }
public void onClose(int closeCode, String message) public void onClose(int closeCode, String message)
{} {}
}); });
Assert.fail(); Assert.fail();
} }
catch(IllegalArgumentException e) catch(IllegalArgumentException e)
@ -82,7 +133,7 @@ public class WebSocketClientTest
Assert.assertFalse(open.get()); Assert.assertFalse(open.get());
} }
@Test @Test
public void testAsyncConnectionRefused() throws Exception public void testAsyncConnectionRefused() throws Exception
{ {
@ -114,15 +165,15 @@ public class WebSocketClientTest
{ {
error=e.getCause(); error=e.getCause();
} }
Assert.assertFalse(open.get()); Assert.assertFalse(open.get());
Assert.assertEquals(WebSocketConnectionD12.CLOSE_NOCLOSE,close.get()); Assert.assertEquals(WebSocketConnectionD12.CLOSE_NOCLOSE,close.get());
Assert.assertTrue(error instanceof ConnectException); Assert.assertTrue(error instanceof ConnectException);
}
}
@Test @Test
public void testConnectionNotAccepted() throws Exception public void testConnectionNotAccepted() throws Exception
{ {
@ -154,11 +205,11 @@ public class WebSocketClientTest
{ {
error=e; error=e;
} }
Assert.assertFalse(open.get()); Assert.assertFalse(open.get());
Assert.assertEquals(WebSocketConnectionD12.CLOSE_NOCLOSE,close.get()); Assert.assertEquals(WebSocketConnectionD12.CLOSE_NOCLOSE,close.get());
Assert.assertTrue(error instanceof TimeoutException); Assert.assertTrue(error instanceof TimeoutException);
} }
@Test @Test
@ -193,14 +244,14 @@ public class WebSocketClientTest
{ {
error=e; error=e;
} }
Assert.assertFalse(open.get()); Assert.assertFalse(open.get());
Assert.assertEquals(WebSocketConnectionD12.CLOSE_NOCLOSE,close.get()); Assert.assertEquals(WebSocketConnectionD12.CLOSE_NOCLOSE,close.get());
Assert.assertTrue(error instanceof TimeoutException); Assert.assertTrue(error instanceof TimeoutException);
} }
@Test @Test
public void testBadHandshake() throws Exception public void testBadHandshake() throws Exception
{ {
@ -234,12 +285,12 @@ public class WebSocketClientTest
{ {
error=e.getCause(); error=e.getCause();
} }
Assert.assertFalse(open.get()); Assert.assertFalse(open.get());
Assert.assertEquals(WebSocketConnectionD12.CLOSE_PROTOCOL,close.get()); Assert.assertEquals(WebSocketConnectionD12.CLOSE_PROTOCOL,close.get());
Assert.assertTrue(error instanceof IOException); Assert.assertTrue(error instanceof IOException);
Assert.assertTrue(error.getMessage().indexOf("404 NOT FOUND")>0); Assert.assertTrue(error.getMessage().indexOf("404 NOT FOUND")>0);
} }
@Test @Test
@ -305,7 +356,7 @@ public class WebSocketClientTest
_latch.countDown(); _latch.countDown();
} }
}); });
Socket socket = _server.accept(); Socket socket = _server.accept();
accept(socket); accept(socket);
@ -313,12 +364,12 @@ public class WebSocketClientTest
Assert.assertNotNull(connection); Assert.assertNotNull(connection);
Assert.assertTrue(open.get()); Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get()); Assert.assertEquals(0,close.get());
socket.close(); socket.close();
_latch.await(10,TimeUnit.SECONDS); _latch.await(10,TimeUnit.SECONDS);
Assert.assertEquals(WebSocketConnectionD12.CLOSE_NOCLOSE,close.get()); Assert.assertEquals(WebSocketConnectionD12.CLOSE_NOCLOSE,close.get());
} }
@Test @Test
@ -343,7 +394,7 @@ public class WebSocketClientTest
_latch.countDown(); _latch.countDown();
} }
}); });
Socket socket = _server.accept(); Socket socket = _server.accept();
accept(socket); accept(socket);
@ -351,13 +402,13 @@ public class WebSocketClientTest
Assert.assertNotNull(connection); Assert.assertNotNull(connection);
Assert.assertTrue(open.get()); Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get()); Assert.assertEquals(0,close.get());
long start=System.currentTimeMillis(); long start=System.currentTimeMillis();
_latch.await(10,TimeUnit.SECONDS); _latch.await(10,TimeUnit.SECONDS);
Assert.assertTrue(System.currentTimeMillis()-start<5000); Assert.assertTrue(System.currentTimeMillis()-start<5000);
Assert.assertEquals(WebSocketConnectionD12.CLOSE_NORMAL,close.get()); Assert.assertEquals(WebSocketConnectionD12.CLOSE_NORMAL,close.get());
} }
@Test @Test
public void testNotIdle() throws Exception public void testNotIdle() throws Exception
@ -381,13 +432,13 @@ public class WebSocketClientTest
close.set(closeCode); close.set(closeCode);
_latch.countDown(); _latch.countDown();
} }
public void onMessage(String data) public void onMessage(String data)
{ {
queue.add(data); queue.add(data);
} }
}); });
Socket socket = _server.accept(); Socket socket = _server.accept();
accept(socket); accept(socket);
@ -395,9 +446,9 @@ public class WebSocketClientTest
Assert.assertNotNull(connection); Assert.assertNotNull(connection);
Assert.assertTrue(open.get()); Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get()); Assert.assertEquals(0,close.get());
// Send some messages client to server // Send some messages client to server
byte[] recv = new byte[1024]; byte[] recv = new byte[1024];
int len=-1; int len=-1;
@ -411,7 +462,7 @@ public class WebSocketClientTest
// Send some messages server to client // Send some messages server to client
byte[] send = new byte[] { (byte)0x81, (byte) 0x02, (byte)'H', (byte)'i'}; byte[] send = new byte[] { (byte)0x81, (byte) 0x02, (byte)'H', (byte)'i'};
for (int i=0;i<10;i++) for (int i=0;i<10;i++)
{ {
Thread.sleep(250); Thread.sleep(250);
@ -428,7 +479,7 @@ public class WebSocketClientTest
_latch.await(10,TimeUnit.SECONDS); _latch.await(10,TimeUnit.SECONDS);
Assert.assertTrue(System.currentTimeMillis()-start<5000); Assert.assertTrue(System.currentTimeMillis()-start<5000);
Assert.assertEquals(1111,close.get()); Assert.assertEquals(1111,close.get());
} }
@ -453,12 +504,12 @@ public class WebSocketClientTest
close.set(closeCode); close.set(closeCode);
_latch.countDown(); _latch.countDown();
} }
public void onMessage(String data) public void onMessage(String data)
{ {
} }
}); });
final Socket socket = _server.accept(); final Socket socket = _server.accept();
accept(socket); accept(socket);
@ -469,7 +520,7 @@ public class WebSocketClientTest
final int messages=20000; final int messages=20000;
final AtomicLong totalB=new AtomicLong(); final AtomicLong totalB=new AtomicLong();
Thread consumer = new Thread() Thread consumer = new Thread()
{ {
public void run() public void run()
@ -498,7 +549,7 @@ public class WebSocketClientTest
} }
}; };
consumer.start(); consumer.start();
// Send lots of messages client to server // Send lots of messages client to server
long max=0; long max=0;
long start=System.currentTimeMillis(); long start=System.currentTimeMillis();
@ -515,7 +566,7 @@ public class WebSocketClientTest
max=duration; max=duration;
} }
} }
// wait for consumer to complete // wait for consumer to complete
while (totalB.get()<messages*(mesg.length()+6L)) while (totalB.get()<messages*(mesg.length()+6L))
Thread.sleep(10); Thread.sleep(10);
@ -524,7 +575,7 @@ public class WebSocketClientTest
consumer.interrupt(); consumer.interrupt();
} }
@Test @Test
public void testBlockReceiving() throws Exception public void testBlockReceiving() throws Exception
@ -549,7 +600,7 @@ public class WebSocketClientTest
close.set(closeCode); close.set(closeCode);
_latch.countDown(); _latch.countDown();
} }
public void onMessage(String data) public void onMessage(String data)
{ {
try try
@ -562,7 +613,7 @@ public class WebSocketClientTest
} }
} }
}); });
Socket socket = _server.accept(); Socket socket = _server.accept();
socket.setSoTimeout(60000); socket.setSoTimeout(60000);
accept(socket); accept(socket);
@ -571,14 +622,14 @@ public class WebSocketClientTest
Assert.assertNotNull(connection); Assert.assertNotNull(connection);
Assert.assertTrue(open.get()); Assert.assertTrue(open.get());
Assert.assertEquals(0,close.get()); Assert.assertEquals(0,close.get());
// define some messages to send server to client // define some messages to send server to client
byte[] send = new byte[] { (byte)0x81, (byte) 0x05, byte[] send = new byte[] { (byte)0x81, (byte) 0x05,
(byte)'H', (byte)'e', (byte)'l', (byte)'l',(byte)'o' }; (byte)'H', (byte)'e', (byte)'l', (byte)'l',(byte)'o' };
final int messages=100000; final int messages=100000;
final AtomicInteger m = new AtomicInteger(); final AtomicInteger m = new AtomicInteger();
// Set up a consumer of received messages that waits a while before consuming // Set up a consumer of received messages that waits a while before consuming
Thread consumer = new Thread() Thread consumer = new Thread()
{ {
@ -607,8 +658,8 @@ public class WebSocketClientTest
} }
}; };
consumer.start(); consumer.start();
long max=0; long max=0;
long start=System.currentTimeMillis(); long start=System.currentTimeMillis();
for (int i=0;i<messages;i++) for (int i=0;i<messages;i++)
@ -624,14 +675,14 @@ public class WebSocketClientTest
max=duration; max=duration;
} }
} }
while(consumer.isAlive()) while(consumer.isAlive())
Thread.sleep(10); Thread.sleep(10);
Assert.assertTrue(max>1000); // writing was blocked Assert.assertTrue(max>1000); // writing was blocked
Assert.assertEquals(m.get(),messages); Assert.assertEquals(m.get(),messages);
// Close with code // Close with code
start=System.currentTimeMillis(); start=System.currentTimeMillis();
socket.getOutputStream().write(new byte[]{(byte)0x88, (byte) 0x02, (byte)4, (byte)87 },0,4); socket.getOutputStream().write(new byte[]{(byte)0x88, (byte) 0x02, (byte)4, (byte)87 },0,4);
@ -654,10 +705,10 @@ public class WebSocketClientTest
isr = new InputStreamReader(in); isr = new InputStreamReader(in);
buf = new BufferedReader(isr); buf = new BufferedReader(isr);
String line; String line;
while((line = buf.readLine())!=null) while((line = buf.readLine())!=null)
{ {
// System.err.println(line); // System.err.println(line);
if(line.length() == 0) if(line.length() == 0)
{ {
// Got the "\r\n" line. // Got the "\r\n" line.
break; break;
@ -668,8 +719,8 @@ public class WebSocketClientTest
out = connection.getOutputStream(); out = connection.getOutputStream();
out.write(serverResponse.getBytes()); out.write(serverResponse.getBytes());
out.flush(); out.flush();
} }
finally finally
{ {
IO.close(buf); IO.close(buf);
IO.close(isr); IO.close(isr);