From 41f60bd15281a76e91b83940890dddf31f5be465 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 21 Nov 2013 12:40:32 +1100 Subject: [PATCH] 421697 - IteratingCallback improvements Cleanup WebSocket impl first + Logic for size on Generator was backwards. + Logic in Generator for RSV flags was incorrect. + Generalizing flagsInUse for Parser too --- .../jetty/websocket/common/Generator.java | 79 ++++++---- .../jetty/websocket/common/Parser.java | 137 ++++++++++-------- .../common/io/WriteBytesProvider.java | 6 +- .../websocket/common/WebSocketFrameTest.java | 3 + .../jetty/websocket/server/LoadTest.java | 41 +++++- .../test/resources/jetty-logging.properties | 2 +- 6 files changed, 172 insertions(+), 96 deletions(-) diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Generator.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Generator.java index b431dbbff10..34df8c7aba8 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Generator.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Generator.java @@ -64,12 +64,17 @@ public class Generator private final ByteBufferPool bufferPool; private final boolean validating; - /** Is there an extension using RSV1 */ - private boolean rsv1InUse = false; - /** Is there an extension using RSV2 */ - private boolean rsv2InUse = false; - /** Is there an extension using RSV3 */ - private boolean rsv3InUse = false; + /** + * Are any flags in use + *

+ * + *

+     *   0100_0000 (0x40) = rsv1
+     *   0010_0000 (0x20) = rsv2
+     *   0001_0000 (0x10) = rsv3
+     * 
+ */ + private byte flagsInUse=0x00; /** * Construct Generator with provided policy and bufferPool @@ -114,17 +119,17 @@ public class Generator * MUST be 0 unless an extension is negotiated that defines meanings for non-zero values. If a nonzero value is received and none of the negotiated * extensions defines the meaning of such a nonzero value, the receiving endpoint MUST _Fail the WebSocket Connection_. */ - if (!rsv1InUse && frame.isRsv1()) + if (frame.isRsv1() && !isRsv1InUse()) { throw new ProtocolException("RSV1 not allowed to be set"); } - if (!rsv2InUse && frame.isRsv2()) + if (frame.isRsv2() && !isRsv2InUse()) { throw new ProtocolException("RSV2 not allowed to be set"); } - if (!rsv3InUse && frame.isRsv3()) + if (frame.isRsv3() && !isRsv3InUse()) { throw new ProtocolException("RSV3 not allowed to be set"); } @@ -161,30 +166,27 @@ public class Generator } } } - } public void configureFromExtensions(List exts) { // default - this.rsv1InUse = false; - this.rsv2InUse = false; - this.rsv3InUse = false; + flagsInUse = 0x00; // configure from list of extensions in use for (Extension ext : exts) { if (ext.isRsv1User()) { - this.rsv1InUse = true; + flagsInUse = (byte)(flagsInUse | 0x40); } if (ext.isRsv2User()) { - this.rsv2InUse = true; + flagsInUse = (byte)(flagsInUse | 0x20); } if (ext.isRsv3User()) { - this.rsv3InUse = true; + flagsInUse = (byte)(flagsInUse | 0x10); } } } @@ -200,14 +202,15 @@ public class Generator /* * start the generation process */ - byte b; - + byte b = 0x00; + // Setup fin thru opcode - b = 0x00; if (frame.isFin()) { b |= 0x80; // 1000_0000 } + + // Set the flags if (frame.isRsv1()) { b |= 0x40; // 0100_0000 @@ -220,7 +223,7 @@ public class Generator { b |= 0x10; // 0001_0000 } - + // NOTE: using .getOpCode() here, not .getType().getOpCode() for testing reasons byte opcode = frame.getOpCode(); @@ -235,8 +238,7 @@ public class Generator buffer.put(b); // is masked - b = 0x00; - b |= (frame.isMasked()?0x80:0x00); + b = (frame.isMasked()?(byte)0x80:(byte)0x00); // payload lengths int payloadLength = frame.getPayloadLength(); @@ -252,16 +254,16 @@ public class Generator buffer.put((byte)0); // buffer.put((byte)0); // anything over an buffer.put((byte)0); // int is just - buffer.put((byte)0); // intsane! + buffer.put((byte)0); // insane! buffer.put((byte)((payloadLength >> 24) & 0xFF)); buffer.put((byte)((payloadLength >> 16) & 0xFF)); buffer.put((byte)((payloadLength >> 8) & 0xFF)); buffer.put((byte)(payloadLength & 0xFF)); } /* - * if payload is greater 126 we have a 7 + 16 bit length + * if payload is greater that 126 we have a 7 + 16 bit length */ - else if (payloadLength >= 0x7E) + else if (payloadLength >= 0x7E ) { b |= 0x7E; buffer.put(b); // indicate 2 byte length @@ -366,19 +368,34 @@ public class Generator return buffer; } + public void setRsv1InUse(boolean rsv1InUse) + { + flagsInUse = (byte)((flagsInUse & 0xBF) | (rsv1InUse?0x40:0x00)); + } + + public void setRsv2InUse(boolean rsv2InUse) + { + flagsInUse = (byte)((flagsInUse & 0xDF) | (rsv2InUse?0x20:0x00)); + } + + public void setRsv3InUse(boolean rsv3InUse) + { + flagsInUse = (byte)((flagsInUse & 0xEF) | (rsv3InUse?0x10:0x00)); + } + public boolean isRsv1InUse() { - return rsv1InUse; + return (flagsInUse & 0x40) != 0; } public boolean isRsv2InUse() { - return rsv2InUse; + return (flagsInUse & 0x20) != 0; } public boolean isRsv3InUse() { - return rsv3InUse; + return (flagsInUse & 0x10) != 0; } @Override @@ -391,15 +408,15 @@ public class Generator { builder.append(",validating"); } - if (rsv1InUse) + if (isRsv1InUse()) { builder.append(",+rsv1"); } - if (rsv2InUse) + if (isRsv2InUse()) { builder.append(",+rsv2"); } - if (rsv3InUse) + if (isRsv3InUse()) { builder.append(",+rsv3"); } diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java index 46c4f7c2535..a53dffb0d6a 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/Parser.java @@ -51,7 +51,6 @@ public class Parser private enum State { START, - FINOP, PAYLOAD_LEN, PAYLOAD_LEN_BYTES, MASK, @@ -76,13 +75,18 @@ public class Parser private PayloadProcessor maskProcessor = new DeMaskProcessor(); // private PayloadProcessor strictnessProcessor; - /** Is there an extension using RSV1 */ - private boolean rsv1InUse = false; - /** Is there an extension using RSV2 */ - private boolean rsv2InUse = false; - /** Is there an extension using RSV3 */ - private boolean rsv3InUse = false; - + /** + * Is there an extension using RSV flag? + *

+ * + *

+     *   0100_0000 (0x40) = rsv1
+     *   0010_0000 (0x20) = rsv2
+     *   0001_0000 (0x10) = rsv3
+     * 
+ */ + private byte flagsInUse=0x00; + private IncomingFrames incomingFramesHandler; public Parser(WebSocketPolicy wspolicy, ByteBufferPool bufferPool) @@ -131,26 +135,24 @@ public class Parser } public void configureFromExtensions(List exts) - { + { // default - this.rsv1InUse = false; - this.rsv2InUse = false; - this.rsv3InUse = false; + flagsInUse = 0x00; // configure from list of extensions in use for (Extension ext : exts) { if (ext.isRsv1User()) { - this.rsv1InUse = true; + flagsInUse = (byte)(flagsInUse | 0x40); } if (ext.isRsv2User()) { - this.rsv2InUse = true; + flagsInUse = (byte)(flagsInUse | 0x20); } if (ext.isRsv3User()) { - this.rsv3InUse = true; + flagsInUse = (byte)(flagsInUse | 0x10); } } } @@ -167,17 +169,17 @@ public class Parser public boolean isRsv1InUse() { - return rsv1InUse; + return (flagsInUse & 0x40) != 0; } public boolean isRsv2InUse() { - return rsv2InUse; + return (flagsInUse & 0x20) != 0; } public boolean isRsv3InUse() { - return rsv3InUse; + return (flagsInUse & 0x10) != 0; } protected void notifyFrame(final Frame f) @@ -294,18 +296,11 @@ public class Parser { frame.reset(); } - - state = State.FINOP; - break; - } - case FINOP: - { + // peek at byte byte b = buffer.get(); boolean fin = ((b & 0x80) != 0); - boolean rsv1 = ((b & 0x40) != 0); - boolean rsv2 = ((b & 0x20) != 0); - boolean rsv3 = ((b & 0x10) != 0); + byte opc = (byte)(b & 0x0F); byte opcode = opc; @@ -313,37 +308,23 @@ public class Parser { throw new ProtocolException("Unknown opcode: " + opc); } - + if (LOG.isDebugEnabled()) { - LOG.debug("OpCode {}, fin={} rsv={}{}{}",OpCode.name(opcode),fin,(rsv1?'1':'.'),(rsv2?'1':'.'),(rsv3?'1':'.')); - } - - /* - * RFC 6455 Section 5.2 - * - * MUST be 0 unless an extension is negotiated that defines meanings for non-zero values. If a nonzero value is received and none of the - * negotiated extensions defines the meaning of such a nonzero value, the receiving endpoint MUST _Fail the WebSocket Connection_. - */ - if (!rsv1InUse && rsv1) - { - throw new ProtocolException("RSV1 not allowed to be set"); - } - - if (!rsv2InUse && rsv2) - { - throw new ProtocolException("RSV2 not allowed to be set"); - } - - if (!rsv3InUse && rsv3) - { - throw new ProtocolException("RSV3 not allowed to be set"); + LOG.debug("OpCode {}, fin={} rsv={}{}{}", + OpCode.name(opcode), + fin, + (isRsv1InUse()?'1':'.'), + (isRsv2InUse()?'1':'.'), + (isRsv3InUse()?'1':'.')); } // base framing flags - switch(opcode) { + switch(opcode) + { case OpCode.TEXT: frame = new TextFrame(); + lastDataOpcode = opcode; // data validation if ((priorDataFrame != null) && (!priorDataFrame.isFin())) { @@ -352,6 +333,7 @@ public class Parser break; case OpCode.BINARY: frame = new BinaryFrame(); + lastDataOpcode = opcode; // data validation if ((priorDataFrame != null) && (!priorDataFrame.isFin())) { @@ -360,6 +342,7 @@ public class Parser break; case OpCode.CONTINUATION: frame = new ContinuationFrame(); + lastDataOpcode = opcode; // continuation validation if (priorDataFrame == null) { @@ -395,18 +378,50 @@ public class Parser } frame.setFin(fin); - frame.setRsv1(rsv1); - frame.setRsv2(rsv2); - frame.setRsv3(rsv3); - if (frame.isDataFrame()) + // Are any flags set? + if ((b & 0x70) != 0) { - lastDataOpcode = opcode; + /* + * RFC 6455 Section 5.2 + * + * MUST be 0 unless an extension is negotiated that defines meanings for non-zero values. If a nonzero value is received and none of the + * negotiated extensions defines the meaning of such a nonzero value, the receiving endpoint MUST _Fail the WebSocket Connection_. + */ + if ((b & 0x40) != 0) + { + if (isRsv1InUse()) + frame.setRsv1(true); + else + throw new ProtocolException("RSV1 not allowed to be set"); + } + if ((b & 0x20) != 0) + { + if (isRsv2InUse()) + frame.setRsv2(true); + else + throw new ProtocolException("RSV2 not allowed to be set"); + } + if ((b & 0x10) != 0) + { + if (isRsv3InUse()) + frame.setRsv3(true); + else + throw new ProtocolException("RSV3 not allowed to be set"); + } } - + else + { + if (LOG.isDebugEnabled()) + { + LOG.debug("OpCode {}, fin={} rsv=000",OpCode.name(opcode),fin); + } + } + state = State.PAYLOAD_LEN; break; } + case PAYLOAD_LEN: { byte b = buffer.get(); @@ -450,6 +465,7 @@ public class Parser break; } + case PAYLOAD_LEN_BYTES: { byte b = buffer.get(); @@ -477,6 +493,7 @@ public class Parser } break; } + case MASK: { byte m[] = new byte[4]; @@ -501,6 +518,7 @@ public class Parser } break; } + case MASK_BYTES: { byte b = buffer.get(); @@ -520,6 +538,7 @@ public class Parser } break; } + case PAYLOAD: { if (parsePayload(buffer)) @@ -549,13 +568,13 @@ public class Parser * @return true if payload is done reading, false if incomplete */ private boolean parsePayload(ByteBuffer buffer) - { + { if (payloadLength == 0) { return true; } - while (buffer.hasRemaining()) + if (buffer.hasRemaining()) { if (payload == null) { diff --git a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/WriteBytesProvider.java b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/WriteBytesProvider.java index bfe683bf5cb..cbc1169e452 100644 --- a/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/WriteBytesProvider.java +++ b/jetty-websocket/websocket-common/src/main/java/org/eclipse/jetty/websocket/common/io/WriteBytesProvider.java @@ -121,19 +121,19 @@ public class WriteBytesProvider implements Callback /** Flush callback, for notifying when a flush should be performed */ private final Callback flushCallback; /** Backlog of frames */ - private LinkedList queue; + private final LinkedList queue; /** the buffer input size */ private int bufferSize = 2048; /** the gathered write bytebuffer array limit */ private int gatheredBufferLimit = 10; /** Past Frames, not yet notified (from gathered generation/write) */ - private LinkedList past; + private final LinkedList past; /** Currently active frame */ private FrameEntry active; /** Tracking for failure */ private Throwable failure; /** Is WriteBytesProvider closed to more WriteBytes being enqueued? */ - private AtomicBoolean closed; + private final AtomicBoolean closed; /** * Create a WriteBytesProvider with specified Generator and "flush" Callback. diff --git a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketFrameTest.java b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketFrameTest.java index 85a7f77b68e..1241d551964 100644 --- a/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketFrameTest.java +++ b/jetty-websocket/websocket-common/src/test/java/org/eclipse/jetty/websocket/common/WebSocketFrameTest.java @@ -105,6 +105,7 @@ public class WebSocketFrameTest TextFrame frame = new TextFrame(); frame.setPayload("Hi"); frame.setRsv1(true); + laxGenerator.setRsv1InUse(true); ByteBuffer actual = generateWholeFrame(laxGenerator,frame); String expected = "C1024869"; assertFrameHex("Lax Text Frame with RSV1",expected,actual); @@ -116,6 +117,7 @@ public class WebSocketFrameTest TextFrame frame = new TextFrame(); frame.setPayload("Hi"); frame.setRsv2(true); + laxGenerator.setRsv2InUse(true); ByteBuffer actual = generateWholeFrame(laxGenerator,frame); String expected = "A1024869"; assertFrameHex("Lax Text Frame with RSV2",expected,actual); @@ -127,6 +129,7 @@ public class WebSocketFrameTest TextFrame frame = new TextFrame(); frame.setPayload("Hi"); frame.setRsv3(true); + laxGenerator.setRsv3InUse(true); ByteBuffer actual = generateWholeFrame(laxGenerator,frame); String expected = "91024869"; assertFrameHex("Lax Text Frame with RSV3",expected,actual); diff --git a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/LoadTest.java b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/LoadTest.java index 142edcee31e..47e901ead52 100644 --- a/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/LoadTest.java +++ b/jetty-websocket/websocket-server/src/test/java/org/eclipse/jetty/websocket/server/LoadTest.java @@ -43,6 +43,8 @@ import org.junit.Test; @Ignore("Need to rewrite this") public class LoadTest { + private final static int loops = 50; + @SuppressWarnings("serial") public static class LoadServlet extends WebSocketServlet { @@ -129,6 +131,41 @@ public class LoadTest server.stop(); } + @Test + public void testA() throws Exception + { + for (int i=0;i