Merge branch 'jetty-9' into jetty-9-oneconnector

This commit is contained in:
Greg Wilkins 2012-08-03 06:33:26 +10:00
commit f6d84d9f91
15 changed files with 136 additions and 33 deletions

View File

@ -199,6 +199,10 @@ public class WebSocketEventDriver implements IncomingFrames
BufferUtil.put(frame.getPayload(),pongBuf);
BufferUtil.flipToFlush(pongBuf,0);
pong.setPayload(pongBuf);
if (LOG.isDebugEnabled())
{
LOG.debug("Pong with {}",BufferUtil.toDetailString(pongBuf));
}
}
session.output("pong",new FutureCallback<String>(),pong);
break;

View File

@ -60,6 +60,11 @@ public final class OpCode
*/
public static final byte PONG = (byte)0x0A;
/**
* An undefined OpCode
*/
public static final byte UNDEFINED = (byte)-1;
public static boolean isControlFrame(byte opcode)
{
return (opcode >= CLOSE);

View File

@ -164,7 +164,7 @@ public class Parser
incomingFramesHandler.incoming(e);
}
public void parse(ByteBuffer buffer)
public synchronized void parse(ByteBuffer buffer)
{
if (buffer.remaining() <= 0)
{

View File

@ -105,7 +105,7 @@ public class WebSocketFrame implements Frame
*/
public WebSocketFrame()
{
reset();
this(OpCode.UNDEFINED);
}
/**
@ -113,7 +113,6 @@ public class WebSocketFrame implements Frame
*/
public WebSocketFrame(byte opcode)
{
reset();
this.opcode = opcode;
}

View File

@ -3,15 +3,91 @@ package org.eclipse.jetty.websocket.protocol;
import static org.hamcrest.Matchers.*;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.websocket.ByteBufferAssert;
import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.api.WebSocketPolicy;
import org.junit.Assert;
import org.junit.Test;
public class GeneratorTest
{
/**
* Prevent regression of masking of many packets.
*/
@Test
public void testManyMasked()
{
byte[] MASK =
{ 0x11, 0x22, 0x33, 0x44 };
int pingCount = 1000;
// the generator
Generator generator = new UnitGenerator();
// Prepare frames
List<WebSocketFrame> send = new ArrayList<>();
for (int i = 0; i < pingCount; i++)
{
String payload = String.format("ping-%d[%X]",i,i);
send.add(WebSocketFrame.ping().setPayload(payload));
}
send.add(new CloseInfo(StatusCode.NORMAL).asFrame());
// Generate into single bytebuffer
int buflen = 0;
for (WebSocketFrame f : send)
{
buflen += f.getPayloadLength() + Generator.OVERHEAD;
}
ByteBuffer completeBuf = ByteBuffer.allocate(buflen);
BufferUtil.clearToFill(completeBuf);
// Generate frames
for (WebSocketFrame f : send)
{
f.setMask(MASK); // make sure we have mask set
ByteBuffer slice = f.getPayload().slice();
BufferUtil.put(generator.generate(f),completeBuf);
f.setPayload(slice);
}
BufferUtil.flipToFlush(completeBuf,0);
// Parse complete buffer (5 bytes at a time)
WebSocketPolicy policy = WebSocketPolicy.newServerPolicy();
Parser parser = new Parser(policy);
IncomingFramesCapture capture = new IncomingFramesCapture();
parser.setIncomingFramesHandler(capture);
int segmentSize = 5;
while (completeBuf.remaining() > 0)
{
ByteBuffer part = completeBuf.slice();
int len = Math.min(segmentSize,part.remaining());
part.limit(part.position() + len);
parser.parse(part);
completeBuf.position(completeBuf.position() + len);
}
// Assert validity of frame
int frameCount = send.size();
capture.assertFrameCount(frameCount);
for (int i = 0; i < frameCount; i++)
{
WebSocketFrame actual = capture.getFrames().get(i);
WebSocketFrame expected = send.get(i);
String prefix = "Frame[" + i + "]";
Assert.assertThat(prefix + ".opcode",actual.getOpCode(),is(expected.getOpCode()));
Assert.assertThat(prefix + ".payloadLength",actual.getPayloadLength(),is(expected.getPayloadLength()));
ByteBufferAssert.assertEquals(prefix + ".payload",expected.getPayload(),actual.getPayload());
}
}
/**
* Test the windowed generate of a frame that has no masking.
*/

View File

@ -24,6 +24,8 @@ import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.websocket.api.WebSocketException;
import org.eclipse.jetty.websocket.io.IncomingFrames;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.Assert;
public class IncomingFramesCapture implements IncomingFrames
@ -69,19 +71,20 @@ public class IncomingFramesCapture implements IncomingFrames
public void dump()
{
System.out.printf("Captured %d incoming frames%n",frames.size());
System.err.printf("Captured %d incoming frames%n",frames.size());
for (int i = 0; i < frames.size(); i++)
{
WebSocketFrame frame = frames.get(i);
System.out.printf("[%3d] %s%n",i,frame);
System.out.printf(" %s%n",BufferUtil.toDetailString(frame.getPayload()));
System.err.printf("[%3d] %s%n",i,frame);
System.err.printf(" %s%n",BufferUtil.toDetailString(frame.getPayload()));
}
}
public int getErrorCount(Class<? extends WebSocketException> errorType)
{
int count = 0;
for(WebSocketException error: errors) {
for (WebSocketException error : errors)
{
if (errorType.isInstance(error))
{
count++;
@ -98,7 +101,8 @@ public class IncomingFramesCapture implements IncomingFrames
public int getFrameCount(byte op)
{
int count = 0;
for(WebSocketFrame frame: frames) {
for (WebSocketFrame frame : frames)
{
if (frame.getOpCode() == op)
{
count++;
@ -122,7 +126,8 @@ public class IncomingFramesCapture implements IncomingFrames
@Override
public void incoming(WebSocketFrame frame)
{
frames.add(frame);
WebSocketFrame copy = new WebSocketFrame(frame);
frames.add(copy);
}
public int size()

View File

@ -47,7 +47,7 @@ public class DeflateExtensionTest
}
@Test
@Ignore("FIXME")
@Ignore /* FIXME */
public void testDeflateFrameExtension() throws Exception
{
BlockheadClient client = new BlockheadClient(server.getServerUri());

View File

@ -10,6 +10,8 @@ import org.eclipse.jetty.websocket.protocol.Generator;
import org.eclipse.jetty.websocket.server.SimpleServletServer;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.rules.TestName;
public abstract class AbstractABCase
{
@ -65,6 +67,9 @@ public abstract class AbstractABCase
return ret.toString();
}
@Rule
public TestName testname = new TestName();
public Generator getLaxGenerator()
{
return laxGenerator;

View File

@ -41,6 +41,7 @@ public class Fuzzer
private final BlockheadClient client;
private final Generator generator;
private final String testname;
private SendMode sendMode = SendMode.BULK;
private int slowSendSegmentSize = 5;
@ -48,6 +49,7 @@ public class Fuzzer
{
this.client = new BlockheadClient(testcase.getServer().getServerUri());
this.generator = testcase.getLaxGenerator();
this.testname = testcase.testname.getMethodName();
}
public ByteBuffer asNetworkBuffer(List<WebSocketFrame> send)
@ -91,6 +93,10 @@ public class Fuzzer
// Read frames
IncomingFramesCapture capture = client.readFrames(expect.size(),TimeUnit.MILLISECONDS,500);
if (LOG.isDebugEnabled())
{
capture.dump();
}
String prefix = "";
for (int i = 0; i < expectedCount; i++)
@ -154,7 +160,7 @@ public class Fuzzer
public void send(List<WebSocketFrame> send) throws IOException
{
Assert.assertThat("Client connected",client.isConnected(),is(true));
LOG.debug("Sending {} frames (mode {})",send.size(),sendMode);
LOG.debug("[{}] Sending {} frames (mode {})",testname,send.size(),sendMode);
if ((sendMode == SendMode.BULK) || (sendMode == SendMode.SLOW))
{
int buflen = 0;
@ -169,6 +175,10 @@ public class Fuzzer
for (WebSocketFrame f : send)
{
f.setMask(MASK); // make sure we have mask set
if (LOG.isDebugEnabled())
{
LOG.debug("payload: {}",BufferUtil.toDetailString(f.getPayload()));
}
BufferUtil.put(generator.generate(f),buf);
}
BufferUtil.flipToFlush(buf,0);

View File

@ -184,7 +184,7 @@ public class TestABCase1 extends AbstractABCase
* Echo 65535 byte TEXT message (uses medium 2 byte payload length)
*/
@Test
@Ignore("FIXME")
@Ignore /* FIXME */
public void testCase1_1_6() throws Exception
{
byte payload[] = new byte[65535];
@ -216,7 +216,7 @@ public class TestABCase1 extends AbstractABCase
* Echo 65536 byte TEXT message (uses large 8 byte payload length)
*/
@Test
@Ignore("FIXME")
@Ignore /* FIXME */
public void testCase1_1_7() throws Exception
{
byte payload[] = new byte[65536];
@ -252,7 +252,7 @@ public class TestABCase1 extends AbstractABCase
* This is done to test the parsing together of the frame on the server side.
*/
@Test
@Ignore("FIXME")
@Ignore /* FIXME */
public void testCase1_1_8() throws Exception
{
byte payload[] = new byte[65536];
@ -438,7 +438,7 @@ public class TestABCase1 extends AbstractABCase
* Echo 65535 byte BINARY message (uses medium 2 byte payload length)
*/
@Test
@Ignore("FIXME")
@Ignore /* FIXME */
public void testCase1_2_6() throws Exception
{
byte payload[] = new byte[65535];
@ -470,7 +470,7 @@ public class TestABCase1 extends AbstractABCase
* Echo 65536 byte BINARY message (uses large 8 byte payload length)
*/
@Test
@Ignore("FIXME")
@Ignore /* FIXME */
public void testCase1_2_7() throws Exception
{
byte payload[] = new byte[65536];
@ -506,7 +506,7 @@ public class TestABCase1 extends AbstractABCase
* This is done to test the parsing together of the frame on the server side.
*/
@Test
@Ignore("FIXME")
@Ignore /* FIXME */
public void testCase1_2_8() throws Exception
{
byte payload[] = new byte[65536];

View File

@ -9,7 +9,6 @@ import org.eclipse.jetty.websocket.api.StatusCode;
import org.eclipse.jetty.websocket.protocol.CloseInfo;
import org.eclipse.jetty.websocket.protocol.OpCode;
import org.eclipse.jetty.websocket.protocol.WebSocketFrame;
import org.junit.Ignore;
import org.junit.Test;
public class TestABCase2 extends AbstractABCase
@ -81,7 +80,6 @@ public class TestABCase2 extends AbstractABCase
* 10 pings, sent slowly
*/
@Test
@Ignore("FIXME")
public void testCase2_11() throws Exception
{
// send 10 pings (slowly) each with unique payload

View File

@ -242,7 +242,7 @@ public class TestABCase5 extends AbstractABCase
* Send text fragmented properly in 2 frames, then continuation!fin, then text unfragmented.
*/
@Test
@Ignore("FIXME")
@Ignore /* FIXME */
public void testCase5_15() throws Exception
{
List<WebSocketFrame> send = new ArrayList<>();
@ -338,7 +338,7 @@ public class TestABCase5 extends AbstractABCase
* text message fragmented in 2 frames, both frames as opcode=TEXT
*/
@Test
@Ignore("FIXME")
@Ignore /* FIXME */
public void testCase5_18() throws Exception
{
List<WebSocketFrame> send = new ArrayList<>();

View File

@ -89,7 +89,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
private final WebSocketPolicy policy;
private final Generator generator;
private final Parser parser;
private final IncomingFramesCapture incomingFrameQueue;
private final IncomingFramesCapture incomingFrames;
private final WebSocketExtensionRegistry extensionRegistry;
private Socket socket;
@ -122,7 +122,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
parser = new Parser(policy);
parseCount = new AtomicInteger(0);
incomingFrameQueue = new IncomingFramesCapture();
incomingFrames = new IncomingFramesCapture();
extensionRegistry = new WebSocketExtensionRegistry(policy,bufferPool);
}
@ -305,7 +305,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
@Override
public void incoming(WebSocketException e)
{
incomingFrameQueue.incoming(e);
incomingFrames.incoming(e);
}
@Override
@ -318,7 +318,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
LOG.info("Client parsed {} frames",count);
}
WebSocketFrame copy = new WebSocketFrame(frame);
incomingFrameQueue.incoming(copy);
incomingFrames.incoming(copy);
}
public boolean isConnected()
@ -368,7 +368,6 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
if (frame.getOpCode() == OpCode.CLOSE)
{
// FIXME terminate the connection?
disconnect();
}
}
@ -387,7 +386,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
public IncomingFramesCapture readFrames(int expectedCount, TimeUnit timeoutUnit, int timeoutDuration) throws IOException, TimeoutException
{
LOG.debug("Read: waiting for {} frame(s) from server",expectedCount);
int startCount = incomingFrameQueue.size();
int startCount = incomingFrames.size();
ByteBuffer buf = bufferPool.acquire(policy.getBufferSize(),false);
BufferUtil.clearToFill(buf);
@ -399,7 +398,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
LOG.debug("Now: {} - expireOn: {} ({} ms)",now,expireOn,msDur);
int len = 0;
while (incomingFrameQueue.size() < (startCount + expectedCount))
while (incomingFrames.size() < (startCount + expectedCount))
{
BufferUtil.clearToFill(buf);
len = read(buf);
@ -419,9 +418,9 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
}
if (!debug && (System.currentTimeMillis() > expireOn))
{
incomingFrameQueue.dump();
incomingFrames.dump();
throw new TimeoutException(String.format("Timeout reading all %d expected frames. (managed to only read %d frame(s))",expectedCount,
incomingFrameQueue.size()));
incomingFrames.size()));
}
}
}
@ -430,7 +429,7 @@ public class BlockheadClient implements IncomingFrames, OutgoingFrames
bufferPool.release(buf);
}
return incomingFrameQueue;
return incomingFrames;
}
public String readResponseHeader() throws IOException

View File

@ -126,7 +126,8 @@ public class IncomingFramesCapture implements IncomingFrames
@Override
public void incoming(WebSocketFrame frame)
{
frames.add(frame);
WebSocketFrame copy = new WebSocketFrame(frame);
frames.add(copy);
}
public int size()

View File

@ -14,4 +14,5 @@ org.eclipse.jetty.websocket.server.helper.RFCSocket.LEVEL=OFF
# org.eclipse.jetty.websocket.extensions.LEVEL=DEBUG
# org.eclipse.jetty.websocket.protocol.Generator.LEVEL=INFO
# org.eclipse.jetty.websocket.protocol.Parser.LEVEL=DEBUG
# org.eclipse.jetty.websocket.server.blockhead.LEVEL=INFO
# org.eclipse.jetty.websocket.server.ab.LEVEL=DEBUG
# org.eclipse.jetty.websocket.server.blockhead.LEVEL=DEBUG