421697 - IteratingCallback improvements

Cleanup WebSocket impl first

+ Logic for size on Generator was backwards.
+ Logic in Generator for RSV flags was incorrect.
+ Generalizing flagsInUse for Parser too
This commit is contained in:
Greg Wilkins 2013-11-21 12:40:32 +11:00
parent e92f44ed73
commit 41f60bd152
6 changed files with 172 additions and 96 deletions

View File

@ -64,12 +64,17 @@ public class Generator
private final ByteBufferPool bufferPool;
private final boolean validating;
/** Is there an extension using RSV1 */
private boolean rsv1InUse = false;
/** Is there an extension using RSV2 */
private boolean rsv2InUse = false;
/** Is there an extension using RSV3 */
private boolean rsv3InUse = false;
/**
* Are any flags in use
* <p>
*
* <pre>
* 0100_0000 (0x40) = rsv1
* 0010_0000 (0x20) = rsv2
* 0001_0000 (0x10) = rsv3
* </pre>
*/
private byte flagsInUse=0x00;
/**
* Construct Generator with provided policy and bufferPool
@ -114,17 +119,17 @@ public class Generator
* MUST be 0 unless an extension is negotiated that defines meanings for non-zero values. If a nonzero value is received and none of the negotiated
* extensions defines the meaning of such a nonzero value, the receiving endpoint MUST _Fail the WebSocket Connection_.
*/
if (!rsv1InUse && frame.isRsv1())
if (frame.isRsv1() && !isRsv1InUse())
{
throw new ProtocolException("RSV1 not allowed to be set");
}
if (!rsv2InUse && frame.isRsv2())
if (frame.isRsv2() && !isRsv2InUse())
{
throw new ProtocolException("RSV2 not allowed to be set");
}
if (!rsv3InUse && frame.isRsv3())
if (frame.isRsv3() && !isRsv3InUse())
{
throw new ProtocolException("RSV3 not allowed to be set");
}
@ -161,30 +166,27 @@ public class Generator
}
}
}
}
public void configureFromExtensions(List<? extends Extension> exts)
{
// default
this.rsv1InUse = false;
this.rsv2InUse = false;
this.rsv3InUse = false;
flagsInUse = 0x00;
// configure from list of extensions in use
for (Extension ext : exts)
{
if (ext.isRsv1User())
{
this.rsv1InUse = true;
flagsInUse = (byte)(flagsInUse | 0x40);
}
if (ext.isRsv2User())
{
this.rsv2InUse = true;
flagsInUse = (byte)(flagsInUse | 0x20);
}
if (ext.isRsv3User())
{
this.rsv3InUse = true;
flagsInUse = (byte)(flagsInUse | 0x10);
}
}
}
@ -200,14 +202,15 @@ public class Generator
/*
* start the generation process
*/
byte b;
byte b = 0x00;
// Setup fin thru opcode
b = 0x00;
if (frame.isFin())
{
b |= 0x80; // 1000_0000
}
// Set the flags
if (frame.isRsv1())
{
b |= 0x40; // 0100_0000
@ -220,7 +223,7 @@ public class Generator
{
b |= 0x10; // 0001_0000
}
// NOTE: using .getOpCode() here, not .getType().getOpCode() for testing reasons
byte opcode = frame.getOpCode();
@ -235,8 +238,7 @@ public class Generator
buffer.put(b);
// is masked
b = 0x00;
b |= (frame.isMasked()?0x80:0x00);
b = (frame.isMasked()?(byte)0x80:(byte)0x00);
// payload lengths
int payloadLength = frame.getPayloadLength();
@ -252,16 +254,16 @@ public class Generator
buffer.put((byte)0); //
buffer.put((byte)0); // anything over an
buffer.put((byte)0); // int is just
buffer.put((byte)0); // intsane!
buffer.put((byte)0); // insane!
buffer.put((byte)((payloadLength >> 24) & 0xFF));
buffer.put((byte)((payloadLength >> 16) & 0xFF));
buffer.put((byte)((payloadLength >> 8) & 0xFF));
buffer.put((byte)(payloadLength & 0xFF));
}
/*
* if payload is greater 126 we have a 7 + 16 bit length
* if payload is greater that 126 we have a 7 + 16 bit length
*/
else if (payloadLength >= 0x7E)
else if (payloadLength >= 0x7E )
{
b |= 0x7E;
buffer.put(b); // indicate 2 byte length
@ -366,19 +368,34 @@ public class Generator
return buffer;
}
public void setRsv1InUse(boolean rsv1InUse)
{
flagsInUse = (byte)((flagsInUse & 0xBF) | (rsv1InUse?0x40:0x00));
}
public void setRsv2InUse(boolean rsv2InUse)
{
flagsInUse = (byte)((flagsInUse & 0xDF) | (rsv2InUse?0x20:0x00));
}
public void setRsv3InUse(boolean rsv3InUse)
{
flagsInUse = (byte)((flagsInUse & 0xEF) | (rsv3InUse?0x10:0x00));
}
public boolean isRsv1InUse()
{
return rsv1InUse;
return (flagsInUse & 0x40) != 0;
}
public boolean isRsv2InUse()
{
return rsv2InUse;
return (flagsInUse & 0x20) != 0;
}
public boolean isRsv3InUse()
{
return rsv3InUse;
return (flagsInUse & 0x10) != 0;
}
@Override
@ -391,15 +408,15 @@ public class Generator
{
builder.append(",validating");
}
if (rsv1InUse)
if (isRsv1InUse())
{
builder.append(",+rsv1");
}
if (rsv2InUse)
if (isRsv2InUse())
{
builder.append(",+rsv2");
}
if (rsv3InUse)
if (isRsv3InUse())
{
builder.append(",+rsv3");
}

View File

@ -51,7 +51,6 @@ public class Parser
private enum State
{
START,
FINOP,
PAYLOAD_LEN,
PAYLOAD_LEN_BYTES,
MASK,
@ -76,13 +75,18 @@ public class Parser
private PayloadProcessor maskProcessor = new DeMaskProcessor();
// private PayloadProcessor strictnessProcessor;
/** Is there an extension using RSV1 */
private boolean rsv1InUse = false;
/** Is there an extension using RSV2 */
private boolean rsv2InUse = false;
/** Is there an extension using RSV3 */
private boolean rsv3InUse = false;
/**
* Is there an extension using RSV flag?
* <p>
*
* <pre>
* 0100_0000 (0x40) = rsv1
* 0010_0000 (0x20) = rsv2
* 0001_0000 (0x10) = rsv3
* </pre>
*/
private byte flagsInUse=0x00;
private IncomingFrames incomingFramesHandler;
public Parser(WebSocketPolicy wspolicy, ByteBufferPool bufferPool)
@ -131,26 +135,24 @@ public class Parser
}
public void configureFromExtensions(List<? extends Extension> exts)
{
{
// default
this.rsv1InUse = false;
this.rsv2InUse = false;
this.rsv3InUse = false;
flagsInUse = 0x00;
// configure from list of extensions in use
for (Extension ext : exts)
{
if (ext.isRsv1User())
{
this.rsv1InUse = true;
flagsInUse = (byte)(flagsInUse | 0x40);
}
if (ext.isRsv2User())
{
this.rsv2InUse = true;
flagsInUse = (byte)(flagsInUse | 0x20);
}
if (ext.isRsv3User())
{
this.rsv3InUse = true;
flagsInUse = (byte)(flagsInUse | 0x10);
}
}
}
@ -167,17 +169,17 @@ public class Parser
public boolean isRsv1InUse()
{
return rsv1InUse;
return (flagsInUse & 0x40) != 0;
}
public boolean isRsv2InUse()
{
return rsv2InUse;
return (flagsInUse & 0x20) != 0;
}
public boolean isRsv3InUse()
{
return rsv3InUse;
return (flagsInUse & 0x10) != 0;
}
protected void notifyFrame(final Frame f)
@ -294,18 +296,11 @@ public class Parser
{
frame.reset();
}
state = State.FINOP;
break;
}
case FINOP:
{
// peek at byte
byte b = buffer.get();
boolean fin = ((b & 0x80) != 0);
boolean rsv1 = ((b & 0x40) != 0);
boolean rsv2 = ((b & 0x20) != 0);
boolean rsv3 = ((b & 0x10) != 0);
byte opc = (byte)(b & 0x0F);
byte opcode = opc;
@ -313,37 +308,23 @@ public class Parser
{
throw new ProtocolException("Unknown opcode: " + opc);
}
if (LOG.isDebugEnabled())
{
LOG.debug("OpCode {}, fin={} rsv={}{}{}",OpCode.name(opcode),fin,(rsv1?'1':'.'),(rsv2?'1':'.'),(rsv3?'1':'.'));
}
/*
* RFC 6455 Section 5.2
*
* MUST be 0 unless an extension is negotiated that defines meanings for non-zero values. If a nonzero value is received and none of the
* negotiated extensions defines the meaning of such a nonzero value, the receiving endpoint MUST _Fail the WebSocket Connection_.
*/
if (!rsv1InUse && rsv1)
{
throw new ProtocolException("RSV1 not allowed to be set");
}
if (!rsv2InUse && rsv2)
{
throw new ProtocolException("RSV2 not allowed to be set");
}
if (!rsv3InUse && rsv3)
{
throw new ProtocolException("RSV3 not allowed to be set");
LOG.debug("OpCode {}, fin={} rsv={}{}{}",
OpCode.name(opcode),
fin,
(isRsv1InUse()?'1':'.'),
(isRsv2InUse()?'1':'.'),
(isRsv3InUse()?'1':'.'));
}
// base framing flags
switch(opcode) {
switch(opcode)
{
case OpCode.TEXT:
frame = new TextFrame();
lastDataOpcode = opcode;
// data validation
if ((priorDataFrame != null) && (!priorDataFrame.isFin()))
{
@ -352,6 +333,7 @@ public class Parser
break;
case OpCode.BINARY:
frame = new BinaryFrame();
lastDataOpcode = opcode;
// data validation
if ((priorDataFrame != null) && (!priorDataFrame.isFin()))
{
@ -360,6 +342,7 @@ public class Parser
break;
case OpCode.CONTINUATION:
frame = new ContinuationFrame();
lastDataOpcode = opcode;
// continuation validation
if (priorDataFrame == null)
{
@ -395,18 +378,50 @@ public class Parser
}
frame.setFin(fin);
frame.setRsv1(rsv1);
frame.setRsv2(rsv2);
frame.setRsv3(rsv3);
if (frame.isDataFrame())
// Are any flags set?
if ((b & 0x70) != 0)
{
lastDataOpcode = opcode;
/*
* RFC 6455 Section 5.2
*
* MUST be 0 unless an extension is negotiated that defines meanings for non-zero values. If a nonzero value is received and none of the
* negotiated extensions defines the meaning of such a nonzero value, the receiving endpoint MUST _Fail the WebSocket Connection_.
*/
if ((b & 0x40) != 0)
{
if (isRsv1InUse())
frame.setRsv1(true);
else
throw new ProtocolException("RSV1 not allowed to be set");
}
if ((b & 0x20) != 0)
{
if (isRsv2InUse())
frame.setRsv2(true);
else
throw new ProtocolException("RSV2 not allowed to be set");
}
if ((b & 0x10) != 0)
{
if (isRsv3InUse())
frame.setRsv3(true);
else
throw new ProtocolException("RSV3 not allowed to be set");
}
}
else
{
if (LOG.isDebugEnabled())
{
LOG.debug("OpCode {}, fin={} rsv=000",OpCode.name(opcode),fin);
}
}
state = State.PAYLOAD_LEN;
break;
}
case PAYLOAD_LEN:
{
byte b = buffer.get();
@ -450,6 +465,7 @@ public class Parser
break;
}
case PAYLOAD_LEN_BYTES:
{
byte b = buffer.get();
@ -477,6 +493,7 @@ public class Parser
}
break;
}
case MASK:
{
byte m[] = new byte[4];
@ -501,6 +518,7 @@ public class Parser
}
break;
}
case MASK_BYTES:
{
byte b = buffer.get();
@ -520,6 +538,7 @@ public class Parser
}
break;
}
case PAYLOAD:
{
if (parsePayload(buffer))
@ -549,13 +568,13 @@ public class Parser
* @return true if payload is done reading, false if incomplete
*/
private boolean parsePayload(ByteBuffer buffer)
{
{
if (payloadLength == 0)
{
return true;
}
while (buffer.hasRemaining())
if (buffer.hasRemaining())
{
if (payload == null)
{

View File

@ -121,19 +121,19 @@ public class WriteBytesProvider implements Callback
/** Flush callback, for notifying when a flush should be performed */
private final Callback flushCallback;
/** Backlog of frames */
private LinkedList<FrameEntry> queue;
private final LinkedList<FrameEntry> queue;
/** the buffer input size */
private int bufferSize = 2048;
/** the gathered write bytebuffer array limit */
private int gatheredBufferLimit = 10;
/** Past Frames, not yet notified (from gathered generation/write) */
private LinkedList<FrameEntry> past;
private final LinkedList<FrameEntry> past;
/** Currently active frame */
private FrameEntry active;
/** Tracking for failure */
private Throwable failure;
/** Is WriteBytesProvider closed to more WriteBytes being enqueued? */
private AtomicBoolean closed;
private final AtomicBoolean closed;
/**
* Create a WriteBytesProvider with specified Generator and "flush" Callback.

View File

@ -105,6 +105,7 @@ public class WebSocketFrameTest
TextFrame frame = new TextFrame();
frame.setPayload("Hi");
frame.setRsv1(true);
laxGenerator.setRsv1InUse(true);
ByteBuffer actual = generateWholeFrame(laxGenerator,frame);
String expected = "C1024869";
assertFrameHex("Lax Text Frame with RSV1",expected,actual);
@ -116,6 +117,7 @@ public class WebSocketFrameTest
TextFrame frame = new TextFrame();
frame.setPayload("Hi");
frame.setRsv2(true);
laxGenerator.setRsv2InUse(true);
ByteBuffer actual = generateWholeFrame(laxGenerator,frame);
String expected = "A1024869";
assertFrameHex("Lax Text Frame with RSV2",expected,actual);
@ -127,6 +129,7 @@ public class WebSocketFrameTest
TextFrame frame = new TextFrame();
frame.setPayload("Hi");
frame.setRsv3(true);
laxGenerator.setRsv3InUse(true);
ByteBuffer actual = generateWholeFrame(laxGenerator,frame);
String expected = "91024869";
assertFrameHex("Lax Text Frame with RSV3",expected,actual);

View File

@ -43,6 +43,8 @@ import org.junit.Test;
@Ignore("Need to rewrite this")
public class LoadTest
{
private final static int loops = 50;
@SuppressWarnings("serial")
public static class LoadServlet extends WebSocketServlet
{
@ -129,6 +131,41 @@ public class LoadTest
server.stop();
}
@Test
public void testA() throws Exception
{
for (int i=0;i<loops;i++)
testManyMessages();
}
@Test
public void testB() throws Exception
{
for (int i=0;i<loops;i++)
testManyMessages();
}
@Test
public void testC() throws Exception
{
for (int i=0;i<loops;i++)
testManyMessages();
}
@Test
public void testD() throws Exception
{
for (int i=0;i<loops;i++)
testManyMessages();
}
@Test
public void testE() throws Exception
{
for (int i=0;i<loops;i++)
testManyMessages();
}
@Test
public void testManyMessages() throws Exception
{
@ -140,13 +177,13 @@ public class LoadTest
client.sendStandardRequest();
client.expectUpgradeResponse();
int iterations = 2000;
int iterations = 10000;
LoadSocket.count.set(0);
threadPool.execute(new TextGen(client,iterations));
client.readFrames(iterations,TimeUnit.SECONDS,10);
client.readFrames(iterations,TimeUnit.SECONDS,40);
}
finally
{

View File

@ -2,7 +2,7 @@ org.eclipse.jetty.util.log.class=org.eclipse.jetty.util.log.StdErrLog
org.eclipse.jetty.LEVEL=WARN
# org.eclipse.jetty.io.WriteFlusher.LEVEL=DEBUG
# org.eclipse.jetty.websocket.LEVEL=DEBUG
org.eclipse.jetty.websocket.LEVEL=DEBUG
# org.eclipse.jetty.websocket.LEVEL=WARN
# org.eclipse.jetty.websocket.common.io.LEVEL=DEBUG
# org.eclipse.jetty.websocket.server.ab.LEVEL=DEBUG