diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD12Test.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD12Test.java index 9513a9eef07..ba869f584e0 100644 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD12Test.java +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD12Test.java @@ -12,6 +12,7 @@ import java.net.Socket; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import java.util.zip.Deflater; import java.util.zip.Inflater; @@ -503,6 +504,95 @@ public class WebSocketMessageD12Test assertEquals(count+1,__textCount.get()); // all messages assertTrue(max>2000); // was blocked } + + @Test + public void testBlockedProducer() throws Exception + { + final Socket socket = new Socket("localhost", __connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + + final int count = 100000; + + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: latch\r\n" + + "Sec-WebSocket-Version: 7\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + + // Make sure the read times out if there are problems with the implementation + socket.setSoTimeout(60000); + + InputStream input = socket.getInputStream(); + + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("\r\n\r\n",input); + + assertTrue(__serverWebSocket.awaitConnected(1000)); + assertNotNull(__serverWebSocket.connection); + __serverWebSocket.connection.setMaxIdleTime(60000); + __latch.countDown(); + + // wait 2s and then consume messages + final AtomicLong totalB=new AtomicLong(); + new Thread() + { + public void run() + { + try + { + Thread.sleep(2000); + + byte[] recv = new byte[32*1024]; + + int len=0; + while (len>=0) + { + totalB.addAndGet(len); + len=socket.getInputStream().read(recv,0,recv.length); + Thread.sleep(10); + } + } + catch(Exception e) + { + e.printStackTrace(); + } + } + }.start(); + + + // Send enough messages to fill receive buffer + long max=0; + long start=System.currentTimeMillis(); + String mesg="How Now Brown Cow"; + for (int i=0;i1000); // was blocked + } @Test public void testServerPingPong() throws Exception diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD12Test.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD12Test.java index bc58fac7a92..3685eefb0da 100644 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD12Test.java +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD12Test.java @@ -307,13 +307,20 @@ public class WebSocketParserD12Test _in.putUnmasked((byte)(2048&0xff)); _in.sendMask(); for (int i=0;i<2048;i++) - _in.put((byte)'a'); + _in.put((byte)('a'+i%26)); int progress =_parser.parseNext(); assertTrue(progress>0); assertEquals(2,_handler._frames); assertEquals(WebSocketConnectionD12.OP_CONTINUATION,_handler._opcode); + assertEquals(1,_handler._data.size()); + String mesg=_handler._data.remove(0); + + assertEquals(2048,mesg.length()); + + for (int i=0;i<2048;i++) + assertEquals(('a'+i%26),mesg.charAt(i)); } private class Handler implements WebSocketParser.FrameHandler