Fixing BlockheadClient and reading of frames

This commit is contained in:
Joakim Erdfelt 2012-07-30 12:40:01 -07:00
parent f34d74ec84
commit 30ae8370bd
13 changed files with 218 additions and 166 deletions

View File

@ -17,12 +17,12 @@ package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
import java.util.Queue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient; import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient;
import org.eclipse.jetty.websocket.server.helper.EchoServlet; import org.eclipse.jetty.websocket.server.helper.EchoServlet;
import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -73,8 +73,8 @@ public class DeflateExtensionTest
client.write(WebSocketFrame.text(msg.toString())); client.write(WebSocketFrame.text(msg.toString()));
Queue<WebSocketFrame> frames = client.readFrames(1,TimeUnit.MILLISECONDS,1000); IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,1000);
WebSocketFrame frame = frames.remove(); WebSocketFrame frame = capture.getFrames().get(0);
Assert.assertThat("TEXT.payload",frame.getPayloadAsUTF8(),is(msg.toString())); Assert.assertThat("TEXT.payload",frame.getPayloadAsUTF8(),is(msg.toString()));
} }
finally finally

View File

@ -17,12 +17,12 @@ package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
import java.util.Queue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient; import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient;
import org.eclipse.jetty.websocket.server.helper.EchoServlet; import org.eclipse.jetty.websocket.server.helper.EchoServlet;
import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -83,10 +83,10 @@ public class FragmentExtensionTest
client.write(WebSocketFrame.text(msg)); client.write(WebSocketFrame.text(msg));
String parts[] = split(msg,fragSize); String parts[] = split(msg,fragSize);
Queue<WebSocketFrame> frames = client.readFrames(parts.length,TimeUnit.MILLISECONDS,1000); IncomingFramesCapture capture = client.readFrames(parts.length,TimeUnit.MILLISECONDS,1000);
for (int i = 0; i < parts.length; i++) for (int i = 0; i < parts.length; i++)
{ {
WebSocketFrame frame = frames.remove(); WebSocketFrame frame = capture.getFrames().get(i);
Assert.assertThat("text[" + i + "].payload",frame.getPayloadAsUTF8(),is(parts[i])); Assert.assertThat("text[" + i + "].payload",frame.getPayloadAsUTF8(),is(parts[i]));
} }
} }

View File

@ -17,12 +17,12 @@ package org.eclipse.jetty.websocket.server;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
import java.util.Queue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient; import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient;
import org.eclipse.jetty.websocket.server.helper.EchoServlet; import org.eclipse.jetty.websocket.server.helper.EchoServlet;
import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -66,8 +66,8 @@ public class IdentityExtensionTest
client.write(WebSocketFrame.text("Hello")); client.write(WebSocketFrame.text("Hello"));
Queue<WebSocketFrame> frames = client.readFrames(1,TimeUnit.MILLISECONDS,1000); IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,1000);
WebSocketFrame frame = frames.remove(); WebSocketFrame frame = capture.getFrames().get(0);
Assert.assertThat("TEXT.payload",frame.getPayloadAsUTF8(),is("Hello")); Assert.assertThat("TEXT.payload",frame.getPayloadAsUTF8(),is("Hello"));
} }
finally finally

View File

@ -20,7 +20,6 @@ import static org.hamcrest.Matchers.*;
import java.net.SocketException; import java.net.SocketException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception; import org.eclipse.jetty.util.Utf8Appendable.NotUtf8Exception;
@ -31,6 +30,7 @@ import org.eclipse.jetty.websocket.protocol.Generator;
import org.eclipse.jetty.websocket.protocol.OpCode; import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient; import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient;
import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
import org.eclipse.jetty.websocket.server.helper.RFCServlet; import org.eclipse.jetty.websocket.server.helper.RFCServlet;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
@ -98,8 +98,8 @@ public class WebSocketServletRFCTest
client.write(bin); // write buf3 (fin=true) client.write(bin); // write buf3 (fin=true)
// Read frame echo'd back (hopefully a single binary frame) // Read frame echo'd back (hopefully a single binary frame)
Queue<WebSocketFrame> frames = client.readFrames(1,TimeUnit.MILLISECONDS,1000); IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,1000);
WebSocketFrame binmsg = frames.remove(); WebSocketFrame binmsg = capture.getFrames().get(0);
int expectedSize = buf1.length + buf2.length + buf3.length; int expectedSize = buf1.length + buf2.length + buf3.length;
Assert.assertThat("BinaryFrame.payloadLength",binmsg.getPayloadLength(),is(expectedSize)); Assert.assertThat("BinaryFrame.payloadLength",binmsg.getPayloadLength(),is(expectedSize));
@ -164,8 +164,8 @@ public class WebSocketServletRFCTest
client.write(WebSocketFrame.text(msg)); client.write(WebSocketFrame.text(msg));
// Read frame (hopefully text frame) // Read frame (hopefully text frame)
Queue<WebSocketFrame> frames = client.readFrames(1,TimeUnit.MILLISECONDS,500); IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
WebSocketFrame tf = frames.remove(); WebSocketFrame tf = capture.getFrames().get(0);
Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg)); Assert.assertThat("Text Frame.status code",tf.getPayloadAsUTF8(),is(msg));
} }
finally finally
@ -193,9 +193,9 @@ public class WebSocketServletRFCTest
// now wait for the server to time out // now wait for the server to time out
// should be 2 frames, the TextFrame echo, and then the Close on disconnect // should be 2 frames, the TextFrame echo, and then the Close on disconnect
Queue<WebSocketFrame> frames = client.readFrames(2,TimeUnit.SECONDS,2); IncomingFramesCapture capture = client.readFrames(2,TimeUnit.SECONDS,2);
Assert.assertThat("frames[0].opcode",frames.remove().getOpCode(),is(OpCode.TEXT)); Assert.assertThat("frames[0].opcode",capture.getFrames().get(0).getOpCode(),is(OpCode.TEXT));
Assert.assertThat("frames[1].opcode",frames.remove().getOpCode(),is(OpCode.CLOSE)); Assert.assertThat("frames[1].opcode",capture.getFrames().get(1).getOpCode(),is(OpCode.CLOSE));
} }
finally finally
{ {
@ -221,8 +221,8 @@ public class WebSocketServletRFCTest
client.write(WebSocketFrame.text("CRASH")); client.write(WebSocketFrame.text("CRASH"));
// Read frame (hopefully close frame) // Read frame (hopefully close frame)
Queue<WebSocketFrame> frames = client.readFrames(1,TimeUnit.MILLISECONDS,500); IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
WebSocketFrame cf = frames.remove(); WebSocketFrame cf = capture.getFrames().get(0);
CloseInfo close = new CloseInfo(cf); CloseInfo close = new CloseInfo(cf);
Assert.assertThat("Close Frame.status code",close.getStatusCode(),is(StatusCode.SERVER_ERROR)); Assert.assertThat("Close Frame.status code",close.getStatusCode(),is(StatusCode.SERVER_ERROR));
} }
@ -261,8 +261,8 @@ public class WebSocketServletRFCTest
Assert.assertThat("Exception",e.getMessage(),containsString("Broken pipe")); Assert.assertThat("Exception",e.getMessage(),containsString("Broken pipe"));
} }
Queue<WebSocketFrame> frames = client.readFrames(1,TimeUnit.SECONDS,1); IncomingFramesCapture capture = client.readFrames(1,TimeUnit.SECONDS,1);
WebSocketFrame frame = frames.remove(); WebSocketFrame frame = capture.getFrames().get(0);
Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE)); Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame); CloseInfo close = new CloseInfo(frame);
Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.MESSAGE_TOO_LARGE)); Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.MESSAGE_TOO_LARGE));
@ -302,8 +302,8 @@ public class WebSocketServletRFCTest
Assert.assertThat("Exception",e.getMessage(),containsString("Broken pipe")); Assert.assertThat("Exception",e.getMessage(),containsString("Broken pipe"));
} }
Queue<WebSocketFrame> frames = client.readFrames(1,TimeUnit.SECONDS,1); IncomingFramesCapture capture = client.readFrames(1,TimeUnit.SECONDS,1);
WebSocketFrame frame = frames.remove(); WebSocketFrame frame = capture.getFrames().get(0);
Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE)); Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame); CloseInfo close = new CloseInfo(frame);
Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.MESSAGE_TOO_LARGE)); Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.MESSAGE_TOO_LARGE));
@ -332,8 +332,8 @@ public class WebSocketServletRFCTest
ByteBuffer bb = generator.generate(txt); ByteBuffer bb = generator.generate(txt);
client.writeRaw(bb); client.writeRaw(bb);
Queue<WebSocketFrame> frames = client.readFrames(1,TimeUnit.SECONDS,1); IncomingFramesCapture capture = client.readFrames(1,TimeUnit.SECONDS,1);
WebSocketFrame frame = frames.remove(); WebSocketFrame frame = capture.getFrames().get(0);
Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE)); Assert.assertThat("frames[0].opcode",frame.getOpCode(),is(OpCode.CLOSE));
CloseInfo close = new CloseInfo(frame); CloseInfo close = new CloseInfo(frame);
Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.BAD_PAYLOAD)); Assert.assertThat("Close Status Code",close.getStatusCode(),is(StatusCode.BAD_PAYLOAD));

View File

@ -15,7 +15,7 @@ public abstract class AbstractABCase
protected static final byte FIN = (byte)0x80; protected static final byte FIN = (byte)0x80;
protected static final byte NOFIN = 0x00; protected static final byte NOFIN = 0x00;
private static final byte MASKED_BIT = (byte)0x80; private static final byte MASKED_BIT = (byte)0x80;
private static final byte[] MASK = protected static final byte[] MASK =
{ 0x12, 0x34, 0x56, 0x78 }; { 0x12, 0x34, 0x56, 0x78 };
protected static Generator strictGenerator; protected static Generator strictGenerator;

View File

@ -19,7 +19,6 @@ import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Arrays; import java.util.Arrays;
import java.util.Queue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
@ -30,6 +29,7 @@ import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.eclipse.jetty.websocket.server.ByteBufferAssert; import org.eclipse.jetty.websocket.server.ByteBufferAssert;
import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient; import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient;
import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
import org.junit.Assert; import org.junit.Assert;
import org.junit.Test; import org.junit.Test;
@ -65,15 +65,15 @@ public class TestABCase1 extends AbstractABCase
client.flush(); client.flush();
// Read frames // Read frames
Queue<WebSocketFrame> frames = client.readFrames(2,TimeUnit.MILLISECONDS,500); IncomingFramesCapture capture = client.readFrames(2,TimeUnit.MILLISECONDS,500);
// Validate echo'd frame // Validate echo'd frame
WebSocketFrame frame = frames.remove(); WebSocketFrame frame = capture.getFrames().get(0);
Assert.assertThat("frame should be " + opcode + " frame",frame.getOpCode(),is(opcode)); Assert.assertThat("frame should be " + opcode + " frame",frame.getOpCode(),is(opcode));
Assert.assertThat(opcode + ".payloadLength",frame.getPayloadLength(),is(0)); Assert.assertThat(opcode + ".payloadLength",frame.getPayloadLength(),is(0));
// Validate close // Validate close
frame = frames.remove(); frame = capture.getFrames().get(1);
Assert.assertThat("CLOSE.frame.opcode",frame.getOpCode(),is(OpCode.CLOSE)); Assert.assertThat("CLOSE.frame.opcode",frame.getOpCode(),is(OpCode.CLOSE));
close = new CloseInfo(frame); close = new CloseInfo(frame);
Assert.assertThat("CLOSE.statusCode",close.getStatusCode(),is(StatusCode.NORMAL)); Assert.assertThat("CLOSE.statusCode",close.getStatusCode(),is(StatusCode.NORMAL));
@ -108,23 +108,25 @@ public class TestABCase1 extends AbstractABCase
// Prepare Close Frame // Prepare Close Frame
CloseInfo close = new CloseInfo(StatusCode.NORMAL); CloseInfo close = new CloseInfo(StatusCode.NORMAL);
buf = strictGenerator.generate(close.asFrame()); WebSocketFrame closeFrame = close.asFrame();
closeFrame.setMask(MASK);
buf = strictGenerator.generate(closeFrame);
// Write Close Frame // Write Close Frame
client.writeRaw(buf); client.writeRaw(buf);
client.flush(); client.flush();
// Read frames // Read frames
Queue<WebSocketFrame> frames = client.readFrames(2,TimeUnit.MILLISECONDS,1000); IncomingFramesCapture capture = client.readFrames(2,TimeUnit.MILLISECONDS,1000);
// Validate echo'd frame // Validate echo'd frame
WebSocketFrame frame = frames.remove(); WebSocketFrame frame = capture.getFrames().get(0);
Assert.assertThat("frame should be " + opcode + " frame",frame.getOpCode(),is(opcode)); Assert.assertThat("frame should be " + opcode + " frame",frame.getOpCode(),is(opcode));
Assert.assertThat(opcode + ".payloadLength",frame.getPayloadLength(),is(payload.length)); Assert.assertThat(opcode + ".payloadLength",frame.getPayloadLength(),is(payload.length));
ByteBufferAssert.assertEquals(opcode + ".payload",payload,frame.getPayload()); ByteBufferAssert.assertEquals(opcode + ".payload",payload,frame.getPayload());
// Validate close // Validate close
frame = frames.remove(); frame = capture.getFrames().get(1);
Assert.assertThat("CLOSE.frame.opcode",frame.getOpCode(),is(OpCode.CLOSE)); Assert.assertThat("CLOSE.frame.opcode",frame.getOpCode(),is(OpCode.CLOSE));
close = new CloseInfo(frame); close = new CloseInfo(frame);
Assert.assertThat("CLOSE.statusCode",close.getStatusCode(),is(StatusCode.NORMAL)); Assert.assertThat("CLOSE.statusCode",close.getStatusCode(),is(StatusCode.NORMAL));
@ -181,16 +183,16 @@ public class TestABCase1 extends AbstractABCase
client.flush(); client.flush();
// Read frames // Read frames
Queue<WebSocketFrame> frames = client.readFrames(2,TimeUnit.MILLISECONDS,500); IncomingFramesCapture capture = client.readFrames(2,TimeUnit.MILLISECONDS,500);
// Validate echo'd frame // Validate echo'd frame
WebSocketFrame frame = frames.remove(); WebSocketFrame frame = capture.getFrames().get(0);
Assert.assertThat("frame should be " + opcode + " frame",frame.getOpCode(),is(opcode)); Assert.assertThat("frame should be " + opcode + " frame",frame.getOpCode(),is(opcode));
Assert.assertThat(opcode + ".payloadLength",frame.getPayloadLength(),is(payload.length)); Assert.assertThat(opcode + ".payloadLength",frame.getPayloadLength(),is(payload.length));
ByteBufferAssert.assertEquals(opcode + ".payload",payload,frame.getPayload()); ByteBufferAssert.assertEquals(opcode + ".payload",payload,frame.getPayload());
// Validate close // Validate close
frame = frames.remove(); frame = capture.getFrames().get(1);
Assert.assertThat("CLOSE.frame.opcode",frame.getOpCode(),is(OpCode.CLOSE)); Assert.assertThat("CLOSE.frame.opcode",frame.getOpCode(),is(OpCode.CLOSE));
close = new CloseInfo(frame); close = new CloseInfo(frame);
Assert.assertThat("CLOSE.statusCode",close.getStatusCode(),is(StatusCode.NORMAL)); Assert.assertThat("CLOSE.statusCode",close.getStatusCode(),is(StatusCode.NORMAL));

View File

@ -18,7 +18,6 @@ package org.eclipse.jetty.websocket.server.ab;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.Queue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
@ -34,6 +33,7 @@ import org.eclipse.jetty.websocket.server.ByteBufferAssert;
import org.eclipse.jetty.websocket.server.SimpleServletServer; import org.eclipse.jetty.websocket.server.SimpleServletServer;
import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient; import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient;
import org.eclipse.jetty.websocket.server.examples.MyEchoServlet; import org.eclipse.jetty.websocket.server.examples.MyEchoServlet;
import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -110,8 +110,8 @@ public class TestABCase5
client.writeRaw(buf2); client.writeRaw(buf2);
// Read frame // Read frame
Queue<WebSocketFrame> frames = client.readFrames(1,TimeUnit.MILLISECONDS,500); IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
WebSocketFrame frame = frames.remove(); WebSocketFrame frame = capture.getFrames().get(0);
Assert.assertThat("frame should be close frame",frame.getOpCode(),is(OpCode.CLOSE)); Assert.assertThat("frame should be close frame",frame.getOpCode(),is(OpCode.CLOSE));
@ -144,8 +144,8 @@ public class TestABCase5
client.writeRaw(buf2); client.writeRaw(buf2);
// Read frame // Read frame
Queue<WebSocketFrame> frames = client.readFrames(1,TimeUnit.MILLISECONDS,500); IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
WebSocketFrame frame = frames.remove(); WebSocketFrame frame = capture.getFrames().pop();
Assert.assertThat("frame should be close frame",frame.getOpCode(),is(OpCode.CLOSE)); Assert.assertThat("frame should be close frame",frame.getOpCode(),is(OpCode.CLOSE));
@ -197,8 +197,8 @@ public class TestABCase5
client.writeRaw(buf2); client.writeRaw(buf2);
// Read frame // Read frame
Queue<WebSocketFrame> frames = client.readFrames(1,TimeUnit.MILLISECONDS,500); IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
WebSocketFrame frame = frames.remove(); WebSocketFrame frame = capture.getFrames().pop();
Assert.assertThat("frame should be close frame",frame.getOpCode(),is(OpCode.CLOSE)); Assert.assertThat("frame should be close frame",frame.getOpCode(),is(OpCode.CLOSE));
@ -231,8 +231,8 @@ public class TestABCase5
client.writeRaw(buf2); client.writeRaw(buf2);
// Read frame // Read frame
Queue<WebSocketFrame> frames = client.readFrames(1,TimeUnit.MILLISECONDS,500); IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
WebSocketFrame frame = frames.remove(); WebSocketFrame frame = capture.getFrames().pop();
Assert.assertThat("frame should be close frame",frame.getOpCode(),is(OpCode.CLOSE)); Assert.assertThat("frame should be close frame",frame.getOpCode(),is(OpCode.CLOSE));
Assert.assertThat("CloseFrame.status code",new CloseInfo(frame).getStatusCode(),is(1002)); Assert.assertThat("CloseFrame.status code",new CloseInfo(frame).getStatusCode(),is(1002));
@ -283,8 +283,8 @@ public class TestABCase5
client.writeRaw(buf2); client.writeRaw(buf2);
// Read frame // Read frame
Queue<WebSocketFrame> frames = client.readFrames(1,TimeUnit.MILLISECONDS,500); IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
WebSocketFrame frame = frames.remove(); WebSocketFrame frame = capture.getFrames().pop();
Assert.assertThat("frame should be text frame",frame.getOpCode(),is(OpCode.TEXT)); Assert.assertThat("frame should be text frame",frame.getOpCode(),is(OpCode.TEXT));
@ -357,15 +357,15 @@ public class TestABCase5
client.writeRaw(buf2); client.writeRaw(buf2);
// Should be 2 frames, pong frame followed by combined echo'd text frame // Should be 2 frames, pong frame followed by combined echo'd text frame
Queue<WebSocketFrame> frames = client.readFrames(2,TimeUnit.SECONDS,1); IncomingFramesCapture capture = client.readFrames(2,TimeUnit.SECONDS,1);
WebSocketFrame frame = frames.remove(); WebSocketFrame frame = capture.getFrames().pop();
Assert.assertThat("first frame should be pong frame",frame.getOpCode(),is(OpCode.PONG)); Assert.assertThat("first frame should be pong frame",frame.getOpCode(),is(OpCode.PONG));
ByteBuffer payload1 = BufferUtil.toBuffer(pingPayload,StringUtil.__UTF8_CHARSET); ByteBuffer payload1 = BufferUtil.toBuffer(pingPayload,StringUtil.__UTF8_CHARSET);
ByteBufferAssert.assertEquals("payloads should be equal",payload1,frame.getPayload()); ByteBufferAssert.assertEquals("payloads should be equal",payload1,frame.getPayload());
frame = frames.remove(); frame = capture.getFrames().pop();
Assert.assertThat("second frame should be text frame",frame.getOpCode(),is(OpCode.TEXT)); Assert.assertThat("second frame should be text frame",frame.getOpCode(),is(OpCode.TEXT));
Assert.assertThat("TextFrame.payload",frame.getPayloadAsUTF8(),is(fragment1 + fragment2)); Assert.assertThat("TextFrame.payload",frame.getPayloadAsUTF8(),is(fragment1 + fragment2));
@ -405,15 +405,15 @@ public class TestABCase5
client.writeRaw(buf3); client.writeRaw(buf3);
// Should be 2 frames, pong frame followed by combined echo'd text frame // Should be 2 frames, pong frame followed by combined echo'd text frame
Queue<WebSocketFrame> frames = client.readFrames(2,TimeUnit.MILLISECONDS,500); IncomingFramesCapture capture = client.readFrames(2,TimeUnit.MILLISECONDS,500);
WebSocketFrame frame = frames.remove(); WebSocketFrame frame = capture.getFrames().pop();
Assert.assertThat("first frame should be pong frame",frame.getOpCode(),is(OpCode.PONG)); Assert.assertThat("first frame should be pong frame",frame.getOpCode(),is(OpCode.PONG));
ByteBuffer payload1 = BufferUtil.toBuffer(pingPayload,StringUtil.__UTF8_CHARSET); ByteBuffer payload1 = BufferUtil.toBuffer(pingPayload,StringUtil.__UTF8_CHARSET);
ByteBufferAssert.assertEquals("Payload",payload1,frame.getPayload()); ByteBufferAssert.assertEquals("Payload",payload1,frame.getPayload());
frame = frames.remove(); frame = capture.getFrames().pop();
Assert.assertThat("second frame should be text frame",frame.getOpCode(),is(OpCode.TEXT)); Assert.assertThat("second frame should be text frame",frame.getOpCode(),is(OpCode.TEXT));
@ -456,8 +456,8 @@ public class TestABCase5
client.writeRaw(buf); client.writeRaw(buf);
// Read frame // Read frame
Queue<WebSocketFrame> frames = client.readFrames(1,TimeUnit.MILLISECONDS,500); IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
WebSocketFrame frame = frames.remove(); WebSocketFrame frame = capture.getFrames().pop();
Assert.assertThat("frame should be close frame",frame.getOpCode(),is(OpCode.CLOSE)); Assert.assertThat("frame should be close frame",frame.getOpCode(),is(OpCode.CLOSE));

View File

@ -21,7 +21,6 @@ import java.nio.ByteBuffer;
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collection; import java.util.Collection;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
@ -33,6 +32,7 @@ import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.eclipse.jetty.websocket.server.SimpleServletServer; import org.eclipse.jetty.websocket.server.SimpleServletServer;
import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient; import org.eclipse.jetty.websocket.server.blockhead.BlockheadClient;
import org.eclipse.jetty.websocket.server.examples.MyEchoServlet; import org.eclipse.jetty.websocket.server.examples.MyEchoServlet;
import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -130,8 +130,8 @@ public class TestABCase7_9
client.writeRaw(buf); client.writeRaw(buf);
// Read frame (hopefully text frame) // Read frame (hopefully text frame)
Queue<WebSocketFrame> frames = client.readFrames(1,TimeUnit.MILLISECONDS,500); IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
WebSocketFrame closeFrame = frames.remove(); WebSocketFrame closeFrame = capture.getFrames().pop();
Assert.assertThat("CloseFrame.status code",new CloseInfo(closeFrame).getStatusCode(),is(1002)); Assert.assertThat("CloseFrame.status code",new CloseInfo(closeFrame).getStatusCode(),is(1002));
} }
finally finally
@ -172,8 +172,8 @@ public class TestABCase7_9
client.writeRaw(buf); client.writeRaw(buf);
// Read frame (hopefully text frame) // Read frame (hopefully text frame)
Queue<WebSocketFrame> frames = client.readFrames(1,TimeUnit.MILLISECONDS,500); IncomingFramesCapture capture = client.readFrames(1,TimeUnit.MILLISECONDS,500);
WebSocketFrame closeFrame = frames.remove(); WebSocketFrame closeFrame = capture.getFrames().pop();
Assert.assertThat("CloseFrame.status code",new CloseInfo(closeFrame).getStatusCode(),is(1002)); Assert.assertThat("CloseFrame.status code",new CloseInfo(closeFrame).getStatusCode(),is(1002));
} }
finally finally

View File

@ -34,8 +34,6 @@ import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Queue;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
@ -65,6 +63,7 @@ import org.eclipse.jetty.websocket.protocol.Generator;
import org.eclipse.jetty.websocket.protocol.OpCode; import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.Parser; import org.eclipse.jetty.websocket.protocol.Parser;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.eclipse.jetty.websocket.server.helper.IncomingFramesCapture;
import org.junit.Assert; import org.junit.Assert;
/** /**
@ -90,7 +89,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
private final WebSocketPolicy policy; private final WebSocketPolicy policy;
private final Generator generator; private final Generator generator;
private final Parser parser; private final Parser parser;
private final LinkedBlockingDeque<WebSocketFrame> incomingFrameQueue; private final IncomingFramesCapture incomingFrameQueue;
private final WebSocketExtensionRegistry extensionRegistry; private final WebSocketExtensionRegistry extensionRegistry;
private Socket socket; private Socket socket;
@ -123,7 +122,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
parser = new Parser(policy); parser = new Parser(policy);
parseCount = new AtomicInteger(0); parseCount = new AtomicInteger(0);
incomingFrameQueue = new LinkedBlockingDeque<>(); incomingFrameQueue = new IncomingFramesCapture();
extensionRegistry = new WebSocketExtensionRegistry(policy,bufferPool); extensionRegistry = new WebSocketExtensionRegistry(policy,bufferPool);
} }
@ -303,7 +302,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
@Override @Override
public void incoming(WebSocketException e) public void incoming(WebSocketException e)
{ {
LOG.warn(e); incomingFrameQueue.incoming(e);
} }
@Override @Override
@ -315,11 +314,8 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
{ {
LOG.info("Client parsed {} frames",count); LOG.info("Client parsed {} frames",count);
} }
WebSocketFrame copy = new WebSocketFrame(frame); // make a copy WebSocketFrame copy = new WebSocketFrame(frame);
if (!incomingFrameQueue.offerLast(copy)) incomingFrameQueue.incoming(copy);
{
throw new RuntimeException("Unable to queue incoming frame: " + copy);
}
} }
public void lookFor(String string) throws IOException public void lookFor(String string) throws IOException
@ -380,7 +376,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
return len; return len;
} }
public Queue<WebSocketFrame> readFrames(int expectedCount, TimeUnit timeoutUnit, int timeoutDuration) throws IOException, TimeoutException public IncomingFramesCapture readFrames(int expectedCount, TimeUnit timeoutUnit, int timeoutDuration) throws IOException, TimeoutException
{ {
LOG.debug("Read: waiting for {} frame(s) from server",expectedCount); LOG.debug("Read: waiting for {} frame(s) from server",expectedCount);
int startCount = incomingFrameQueue.size(); int startCount = incomingFrameQueue.size();
@ -415,6 +411,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
} }
if (!debug && (System.currentTimeMillis() > expireOn)) if (!debug && (System.currentTimeMillis() > expireOn))
{ {
incomingFrameQueue.dump();
throw new TimeoutException(String.format("Timeout reading all %d expected frames. (managed to only read %d frame(s))",expectedCount, throw new TimeoutException(String.format("Timeout reading all %d expected frames. (managed to only read %d frame(s))",expectedCount,
incomingFrameQueue.size())); incomingFrameQueue.size()));
} }

View File

@ -17,35 +17,46 @@ package org.eclipse.jetty.websocket.server.helper;
import static org.hamcrest.Matchers.*; import static org.hamcrest.Matchers.*;
import java.util.ArrayList; import java.util.LinkedList;
import java.util.List;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger; import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketException; import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.io.IncomingFrames; import org.eclipse.jetty.websocket.io.IncomingFrames;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame; import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.Assert; import org.junit.Assert;
public class FrameParseCapture implements IncomingFrames public class IncomingFramesCapture implements IncomingFrames
{ {
private static final Logger LOG = Log.getLogger(FrameParseCapture.class); private static final Logger LOG = Log.getLogger(IncomingFramesCapture.class);
private List<WebSocketFrame> frames = new ArrayList<>(); private LinkedList<WebSocketFrame> frames = new LinkedList<>();
private List<WebSocketException> errors = new ArrayList<>(); private LinkedList<WebSocketException> errors = new LinkedList<>();
public void assertErrorCount(int expectedCount)
{
Assert.assertThat("Captured error count",errors.size(),is(expectedCount));
}
public void assertFrameCount(int expectedCount)
{
Assert.assertThat("Captured frame count",frames.size(),is(expectedCount));
}
public void assertHasErrors(Class<? extends WebSocketException> errorType, int expectedCount) public void assertHasErrors(Class<? extends WebSocketException> errorType, int expectedCount)
{ {
Assert.assertThat(errorType.getSimpleName(),getErrorCount(errorType),is(expectedCount)); Assert.assertThat(errorType.getSimpleName(),getErrorCount(errorType),is(expectedCount));
} }
public void assertHasFrame(Class<? extends WebSocketFrame> frameType) public void assertHasFrame(OpCode op)
{ {
Assert.assertThat(frameType.getSimpleName(),getFrameCount(frameType),greaterThanOrEqualTo(1)); Assert.assertThat(op.name(),getFrameCount(op),greaterThanOrEqualTo(1));
} }
public void assertHasFrame(Class<? extends WebSocketFrame> frameType, int expectedCount) public void assertHasFrame(OpCode op, int expectedCount)
{ {
Assert.assertThat(frameType.getSimpleName(),getFrameCount(frameType),is(expectedCount)); Assert.assertThat(op.name(),getFrameCount(op),is(expectedCount));
} }
public void assertHasNoFrames() public void assertHasNoFrames()
@ -58,6 +69,17 @@ public class FrameParseCapture implements IncomingFrames
Assert.assertThat("Has no errors",errors.size(),is(0)); Assert.assertThat("Has no errors",errors.size(),is(0));
} }
public void dump()
{
System.out.printf("Captured %d incoming frames%n",frames.size());
for (int i = 0; i < frames.size(); i++)
{
WebSocketFrame frame = frames.get(i);
System.out.printf("[%3d] %s%n",i,frame);
System.out.printf(" %s%n",BufferUtil.toDetailString(frame.getPayload()));
}
}
public int getErrorCount(Class<? extends WebSocketException> errorType) public int getErrorCount(Class<? extends WebSocketException> errorType)
{ {
int count = 0; int count = 0;
@ -71,17 +93,17 @@ public class FrameParseCapture implements IncomingFrames
return count; return count;
} }
public List<WebSocketException> getErrors() public LinkedList<WebSocketException> getErrors()
{ {
return errors; return errors;
} }
public int getFrameCount(Class<? extends WebSocketFrame> frameType) public int getFrameCount(OpCode op)
{ {
int count = 0; int count = 0;
for (WebSocketFrame frame : frames) for (WebSocketFrame frame : frames)
{ {
if (frameType.isInstance(frame)) if (frame.getOpCode() == op)
{ {
count++; count++;
} }
@ -89,7 +111,7 @@ public class FrameParseCapture implements IncomingFrames
return count; return count;
} }
public List<WebSocketFrame> getFrames() public LinkedList<WebSocketFrame> getFrames()
{ {
return frames; return frames;
} }
@ -97,7 +119,7 @@ public class FrameParseCapture implements IncomingFrames
@Override @Override
public void incoming(WebSocketException e) public void incoming(WebSocketException e)
{ {
LOG.warn(e); LOG.debug(e);
errors.add(e); errors.add(e);
} }
@ -106,4 +128,9 @@ public class FrameParseCapture implements IncomingFrames
{ {
frames.add(frame); frames.add(frame);
} }
public int size()
{
return frames.size();
}
} }

View File

@ -1,77 +0,0 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.server.helper;
import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketAdapter;
import org.eclipse.jetty.websocket.api.WebSocketConnection;
public class MessageSender extends WebSocketAdapter
{
private CountDownLatch connectLatch = new CountDownLatch(1);
private int closeCode = -1;
private String closeMessage = null;
public void awaitConnect() throws InterruptedException
{
connectLatch.await(1,TimeUnit.SECONDS);
}
public void close()
{
try
{
getConnection().close(StatusCode.NORMAL,null);
}
catch (IOException e)
{
e.printStackTrace();
}
}
public int getCloseCode()
{
return closeCode;
}
public String getCloseMessage()
{
return closeMessage;
}
@Override
public void onWebSocketClose(int statusCode, String reason)
{
this.closeCode = statusCode;
this.closeMessage = reason;
}
@Override
public void onWebSocketConnect(WebSocketConnection connection)
{
super.onWebSocketConnect(connection);
connectLatch.countDown();
}
public void sendMessage(String format, Object... args) throws IOException
{
getBlockingConnection().write(String.format(format,args));
}
}

View File

@ -0,0 +1,99 @@
// ========================================================================
// Copyright 2011-2012 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
//========================================================================
package org.eclipse.jetty.websocket.server.helper;
import static org.hamcrest.Matchers.*;
import java.util.LinkedList;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.websocket.io.OutgoingFrames;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.Assert;
public class OutgoingFramesCapture implements OutgoingFrames
{
public static class Write<C>
{
public C context;
public Callback<C> callback;
public WebSocketFrame frame;
}
private LinkedList<Write<?>> writes = new LinkedList<>();
public void assertFrameCount(int expectedCount)
{
Assert.assertThat("Captured frame count",writes.size(),is(expectedCount));
}
public void assertHasFrame(OpCode op)
{
Assert.assertThat(op.name(),getFrameCount(op),greaterThanOrEqualTo(1));
}
public void assertHasFrame(OpCode op, int expectedCount)
{
Assert.assertThat(op.name(),getFrameCount(op),is(expectedCount));
}
public void assertHasNoFrames()
{
Assert.assertThat("Has no frames",writes.size(),is(0));
}
public void dump()
{
System.out.printf("Captured %d outgoing writes%n",writes.size());
for (int i = 0; i < writes.size(); i++)
{
Write<?> write = writes.get(i);
System.out.printf("[%3d] %s | %s | %s%n",i,write.context,write.callback,write.frame);
System.out.printf(" %s%n",BufferUtil.toDetailString(write.frame.getPayload()));
}
}
public int getFrameCount(OpCode op)
{
int count = 0;
for (Write<?> write : writes)
{
WebSocketFrame frame = write.frame;
if (frame.getOpCode() == op)
{
count++;
}
}
return count;
}
public LinkedList<Write<?>> getWrites()
{
return writes;
}
@Override
public <C> void output(C context, Callback<C> callback, WebSocketFrame frame)
{
Write<C> write = new Write<C>();
write.context = context;
write.callback = callback;
write.frame = frame;
writes.add(write);
}
}

View File

@ -1,13 +1,17 @@
org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.io.LEVEL=WARN org.eclipse.jetty.io.LEVEL=WARN
org.eclipse.jetty.server.LEVEL=WARN org.eclipse.jetty.server.LEVEL=WARN
org.eclipse.jetty.websocket.LEVEL=WARN # org.eclipse.jetty.websocket.LEVEL=WARN
org.eclipse.jetty.websocket.server.helper.RFCSocket.LEVEL=OFF org.eclipse.jetty.websocket.server.helper.RFCSocket.LEVEL=OFF
# See the read/write traffic
# org.eclipse.jetty.websocket.io.Frames.LEVEL=DEBUG
# org.eclipse.jetty.websocket.io.LEVEL=DEBUG # org.eclipse.jetty.websocket.io.LEVEL=DEBUG
# org.eclipse.jetty.websocket.io.WebSocketAsyncConnection.LEVEL=DEBUG
# org.eclipse.jetty.util.thread.QueuedThreadPool.LEVEL=DEBUG # org.eclipse.jetty.util.thread.QueuedThreadPool.LEVEL=DEBUG
# org.eclipse.jetty.io.SelectorManager.LEVEL=INFO # org.eclipse.jetty.io.SelectorManager.LEVEL=INFO
# org.eclipse.jetty.websocket.LEVEL=DEBUG # org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.driver.WebSocketEventDriver.LEVEL=DEBUG
# org.eclipse.jetty.websocket.extensions.LEVEL=DEBUG # org.eclipse.jetty.websocket.extensions.LEVEL=DEBUG
# org.eclipse.jetty.websocket.protocol.Generator.LEVEL=INFO # org.eclipse.jetty.websocket.protocol.Generator.LEVEL=INFO
# org.eclipse.jetty.websocket.protocol.Parser.LEVEL=INFO # org.eclipse.jetty.websocket.protocol.Parser.LEVEL=DEBUG
# org.eclipse.jetty.websocket.server.blockhead.LEVEL=INFO # org.eclipse.jetty.websocket.server.blockhead.LEVEL=INFO