353073 blocking tests

This commit is contained in:
Greg Wilkins 2011-08-31 18:35:40 +10:00
parent 582655ef52
commit 82e364730b
2 changed files with 98 additions and 1 deletions

View File

@ -12,6 +12,7 @@ import java.net.Socket;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.zip.Deflater; import java.util.zip.Deflater;
import java.util.zip.Inflater; import java.util.zip.Inflater;
@ -504,6 +505,95 @@ public class WebSocketMessageD12Test
assertTrue(max>2000); // was blocked assertTrue(max>2000); // was blocked
} }
@Test
public void testBlockedProducer() throws Exception
{
final Socket socket = new Socket("localhost", __connector.getLocalPort());
OutputStream output = socket.getOutputStream();
final int count = 100000;
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: latch\r\n" +
"Sec-WebSocket-Version: 7\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
// Make sure the read times out if there are problems with the implementation
socket.setSoTimeout(60000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(__serverWebSocket.awaitConnected(1000));
assertNotNull(__serverWebSocket.connection);
__serverWebSocket.connection.setMaxIdleTime(60000);
__latch.countDown();
// wait 2s and then consume messages
final AtomicLong totalB=new AtomicLong();
new Thread()
{
public void run()
{
try
{
Thread.sleep(2000);
byte[] recv = new byte[32*1024];
int len=0;
while (len>=0)
{
totalB.addAndGet(len);
len=socket.getInputStream().read(recv,0,recv.length);
Thread.sleep(10);
}
}
catch(Exception e)
{
e.printStackTrace();
}
}
}.start();
// Send enough messages to fill receive buffer
long max=0;
long start=System.currentTimeMillis();
String mesg="How Now Brown Cow";
for (int i=0;i<count;i++)
{
__serverWebSocket.connection.sendMessage(mesg);
if (i%100==0)
{
output.flush();
long now=System.currentTimeMillis();
long duration=now-start;
start=now;
if (max<duration)
max=duration;
}
}
while(totalB.get()<(count*(mesg.length()+2)))
Thread.sleep(100);
assertEquals(count*(mesg.length()+2),totalB.get()); // all messages
assertTrue(max>1000); // was blocked
}
@Test @Test
public void testServerPingPong() throws Exception public void testServerPingPong() throws Exception
{ {

View File

@ -307,13 +307,20 @@ public class WebSocketParserD12Test
_in.putUnmasked((byte)(2048&0xff)); _in.putUnmasked((byte)(2048&0xff));
_in.sendMask(); _in.sendMask();
for (int i=0;i<2048;i++) for (int i=0;i<2048;i++)
_in.put((byte)'a'); _in.put((byte)('a'+i%26));
int progress =_parser.parseNext(); int progress =_parser.parseNext();
assertTrue(progress>0); assertTrue(progress>0);
assertEquals(2,_handler._frames); assertEquals(2,_handler._frames);
assertEquals(WebSocketConnectionD12.OP_CONTINUATION,_handler._opcode); assertEquals(WebSocketConnectionD12.OP_CONTINUATION,_handler._opcode);
assertEquals(1,_handler._data.size());
String mesg=_handler._data.remove(0);
assertEquals(2048,mesg.length());
for (int i=0;i<2048;i++)
assertEquals(('a'+i%26),mesg.charAt(i));
} }
private class Handler implements WebSocketParser.FrameHandler private class Handler implements WebSocketParser.FrameHandler