diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocket.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocket.java index 77810b8546c..3a72166051d 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocket.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocket.java @@ -148,7 +148,10 @@ public interface WebSocket byte textOpcode(); byte continuationOpcode(); byte finMask(); + String getProtocol(); + void setFakeFragments(boolean fake); + boolean isFakeFragments(); boolean isControl(byte opcode); boolean isText(byte opcode); boolean isBinary(byte opcode); diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD00.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD00.java index d599148db0b..eed3c066cf9 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD00.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD00.java @@ -530,4 +530,16 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc { return 0; } + + public void setFakeFragments(boolean fake) + { + // TODO Auto-generated method stub + + } + + public boolean isFakeFragments() + { + // TODO Auto-generated method stub + return false; + } } diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD06.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD06.java index 3373868cb57..d254007fafa 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD06.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD06.java @@ -491,6 +491,15 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc { return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort(); } + + public void setFakeFragments(boolean fake) + { + } + + public boolean isFakeFragments() + { + return false; + } } /* ------------------------------------------------------------ */ diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD10.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD10.java index b7a4d7d8c8d..ba1ee8333ff 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD10.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD10.java @@ -45,7 +45,8 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc final static byte OP_TEXT = 0x01; final static byte OP_BINARY = 0x02; final static byte OP_EXT_DATA = 0x03; - + + final static byte OP_CONTROL = 0x08; final static byte OP_CLOSE = 0x08; final static byte OP_PING = 0x09; final static byte OP_PONG = 0x0A; @@ -60,16 +61,18 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc final static int CLOSE_NOCLOSE=1006; final static int CLOSE_NOTUTF8=1007; + final static int FLAG_FIN=0x8; + final static int VERSION=8; static boolean isLastFrame(byte flags) { - return (flags&0x8)!=0; + return (flags&FLAG_FIN)!=0; } static boolean isControlFrame(byte opcode) { - return (opcode&0x8)!=0; + return (opcode&OP_CONTROL)!=0; } private final static byte[] MAGIC; @@ -87,8 +90,8 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc private final String _protocol; private final int _draft; private int _close; - private boolean _closedIn; - private boolean _closedOut; + private volatile boolean _closedIn; + private volatile boolean _closedOut; private int _maxTextMessageSize; private int _maxBinaryMessageSize=-1; @@ -105,12 +108,12 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc } } - private final WebSocketParser.FrameHandler _frameHandler= new FrameHandlerD07(); + private final WebSocketParser.FrameHandler _frameHandler= new WSFrameHandler(); /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ - private final WebSocket.FrameConnection _connection = new FrameConnectionD10(); + private final WebSocket.FrameConnection _connection = new WSFrameConnection(); /* ------------------------------------------------------------ */ @@ -341,7 +344,6 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc _close=code; } - try { if (closed) @@ -360,7 +362,7 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1); bytes[0]=(byte)(code/0x100); bytes[1]=(byte)(code%0x100); - _outbound.addFrame((byte)0x8,WebSocketConnectionD10.OP_CLOSE,bytes,0,bytes.length); + _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD10.OP_CLOSE,bytes,0,bytes.length); } _outbound.flush(); @@ -390,29 +392,29 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ - private class FrameConnectionD10 implements WebSocket.FrameConnection + private class WSFrameConnection implements WebSocket.FrameConnection { volatile boolean _disconnecting; int _maxTextMessage=WebSocketConnectionD10.this._maxTextMessageSize; int _maxBinaryMessage=WebSocketConnectionD10.this._maxBinaryMessageSize; /* ------------------------------------------------------------ */ - public synchronized void sendMessage(String content) throws IOException + public void sendMessage(String content) throws IOException { if (_closedOut) throw new IOException("closing"); byte[] data = content.getBytes(StringUtil.__UTF8); - _outbound.addFrame((byte)0x8,WebSocketConnectionD10.OP_TEXT,data,0,data.length); + _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD10.OP_TEXT,data,0,data.length); checkWriteable(); _idle.access(_endp); } /* ------------------------------------------------------------ */ - public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException + public void sendMessage(byte[] content, int offset, int length) throws IOException { if (_closedOut) throw new IOException("closing"); - _outbound.addFrame((byte)0x8,WebSocketConnectionD10.OP_BINARY,content,offset,length); + _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD10.OP_BINARY,content,offset,length); checkWriteable(); _idle.access(_endp); } @@ -432,7 +434,7 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc { if (_closedOut) throw new IOException("closing"); - _outbound.addFrame((byte)0x8,ctrl,data,offset,length); + _outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length); checkWriteable(); _idle.access(_endp); } @@ -509,7 +511,7 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc /* ------------------------------------------------------------ */ public byte finMask() { - return 0x8; + return FLAG_FIN; } /* ------------------------------------------------------------ */ @@ -559,6 +561,18 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc { close(CLOSE_NORMAL,null); } + + /* ------------------------------------------------------------ */ + public void setFakeFragments(boolean fake) + { + _parser.setFakeFragments(fake); + } + + /* ------------------------------------------------------------ */ + public boolean isFakeFragments() + { + return _parser.isFakeFragments(); + } /* ------------------------------------------------------------ */ public String toString() @@ -570,7 +584,7 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ - private class FrameHandlerD07 implements WebSocketParser.FrameHandler + private class WSFrameHandler implements WebSocketParser.FrameHandler { private final Utf8StringBuilder _utf8 = new Utf8StringBuilder(); private ByteArrayBuffer _aggregate; @@ -585,175 +599,174 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc // Ignore incoming after a close if (_closedIn) return; - - try - { - byte[] array=buffer.array(); + } + try + { + byte[] array=buffer.array(); - // Deliver frame if websocket is a FrameWebSocket - if (_onFrame!=null) + // Deliver frame if websocket is a FrameWebSocket + if (_onFrame!=null) + { + if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length())) + return; + } + + if (_onControl!=null && isControlFrame(opcode)) + { + if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length())) + return; + } + + switch(opcode) + { + case WebSocketConnectionD10.OP_CONTINUATION: { - if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length())) - return; - } - - if (_onControl!=null && isControlFrame(opcode)) - { - if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length())) - return; - } - - switch(opcode) - { - case WebSocketConnectionD10.OP_CONTINUATION: + // If text, append to the message buffer + if (_opcode==WebSocketConnectionD10.OP_TEXT && _connection.getMaxTextMessageSize()>=0) { - // If text, append to the message buffer - if (_opcode==WebSocketConnectionD10.OP_TEXT && _connection.getMaxTextMessageSize()>=0) + if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize())) { - if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize())) + // If this is the last fragment, deliver the text buffer + if (lastFrame && _onTextMessage!=null) { - // If this is the last fragment, deliver the text buffer - if (lastFrame && _onTextMessage!=null) + _opcode=-1; + String msg =_utf8.toString(); + _utf8.reset(); + _onTextMessage.onMessage(msg); + } + } + else + { + _connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars"); + _utf8.reset(); + _opcode=-1; + } + } + else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0) + { + if (_aggregate.space()<_aggregate.length()) + { + _connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize()); + _aggregate.clear(); + _opcode=-1; + } + else + { + _aggregate.put(buffer); + + // If this is the last fragment, deliver + if (lastFrame && _onBinaryMessage!=null) + { + try + { + _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length()); + } + finally { _opcode=-1; - String msg =_utf8.toString(); - _utf8.reset(); - _onTextMessage.onMessage(msg); - } - } - else - { - _connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars"); - _utf8.reset(); - _opcode=-1; - } - } - else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0) - { - if (_aggregate.space()<_aggregate.length()) - { - _connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize()); - _aggregate.clear(); - _opcode=-1; - } - else - { - _aggregate.put(buffer); - - // If this is the last fragment, deliver - if (lastFrame && _onBinaryMessage!=null) - { - try - { - _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length()); - } - finally - { - _opcode=-1; - _aggregate.clear(); - } + _aggregate.clear(); } } } - break; } - case WebSocketConnectionD10.OP_PING: - { - Log.debug("PING {}",this); - if (!_closedOut) - _connection.sendControl(WebSocketConnectionD10.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length()); - break; - } - - case WebSocketConnectionD10.OP_PONG: - { - Log.debug("PONG {}",this); - break; - } - - case WebSocketConnectionD10.OP_CLOSE: - { - int code=WebSocketConnectionD10.CLOSE_NOCODE; - String message=null; - if (buffer.length()>=2) - { - code=buffer.array()[buffer.getIndex()]*0x100+buffer.array()[buffer.getIndex()+1]; - if (buffer.length()>2) - message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8); - } - closeIn(code,message); - break; - } - - - case WebSocketConnectionD10.OP_TEXT: - { - if(_onTextMessage!=null) - { - if (lastFrame) - { - // Deliver the message - _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8)); - } - else - { - if (_connection.getMaxTextMessageSize()>=0) - { - // If this is a text fragment, append to buffer - if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize())) - _opcode=WebSocketConnectionD10.OP_TEXT; - else - { - _utf8.reset(); - _opcode=-1; - _connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars"); - } - } - } - } - break; - } - - default: - { - if (_onBinaryMessage!=null) - { - if (lastFrame) - { - _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length()); - } - else - { - if (_connection.getMaxBinaryMessageSize()>=0) - { - if (buffer.length()>_connection.getMaxBinaryMessageSize()) - { - _connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize()); - if (_aggregate!=null) - _aggregate.clear(); - _opcode=-1; - } - else - { - _opcode=opcode; - if (_aggregate==null) - _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize()); - _aggregate.put(buffer); - } - } - } - } - } + break; } + case WebSocketConnectionD10.OP_PING: + { + Log.debug("PING {}",this); + if (!_closedOut) + _connection.sendControl(WebSocketConnectionD10.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length()); + break; + } + + case WebSocketConnectionD10.OP_PONG: + { + Log.debug("PONG {}",this); + break; + } + + case WebSocketConnectionD10.OP_CLOSE: + { + int code=WebSocketConnectionD10.CLOSE_NOCODE; + String message=null; + if (buffer.length()>=2) + { + code=buffer.array()[buffer.getIndex()]*0x100+buffer.array()[buffer.getIndex()+1]; + if (buffer.length()>2) + message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8); + } + closeIn(code,message); + break; + } + + + case WebSocketConnectionD10.OP_TEXT: + { + if(_onTextMessage!=null) + { + if (lastFrame) + { + // Deliver the message + _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8)); + } + else + { + if (_connection.getMaxTextMessageSize()>=0) + { + // If this is a text fragment, append to buffer + if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize())) + _opcode=WebSocketConnectionD10.OP_TEXT; + else + { + _utf8.reset(); + _opcode=-1; + _connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars"); + } + } + } + } + break; + } + + default: + { + if (_onBinaryMessage!=null) + { + if (lastFrame) + { + _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length()); + } + else + { + if (_connection.getMaxBinaryMessageSize()>=0) + { + if (buffer.length()>_connection.getMaxBinaryMessageSize()) + { + _connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize()); + if (_aggregate!=null) + _aggregate.clear(); + _opcode=-1; + } + else + { + _opcode=opcode; + if (_aggregate==null) + _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize()); + _aggregate.put(buffer); + } + } + } + } + } } - catch(ThreadDeath th) - { - throw th; - } - catch(Throwable th) - { - Log.warn(th); - } + } + catch(ThreadDeath th) + { + throw th; + } + catch(Throwable th) + { + Log.warn(th); } } diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD10.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD10.java index d1654b9ce3a..59b8fd45c17 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD10.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD10.java @@ -46,7 +46,6 @@ public class WebSocketParserD10 implements WebSocketParser } }; - private final WebSocketBuffers _buffers; private final EndPoint _endp; private final FrameHandler _handler; @@ -61,6 +60,7 @@ public class WebSocketParserD10 implements WebSocketParser private final byte[] _mask = new byte[4]; private int _m; private boolean _skip; + private boolean _fakeFragments=true; /* ------------------------------------------------------------ */ /** @@ -79,6 +79,24 @@ public class WebSocketParserD10 implements WebSocketParser _state=State.START; } + /* ------------------------------------------------------------ */ + /** + * @return True if fake fragments should be created for frames larger than the buffer. + */ + public boolean isFakeFragments() + { + return _fakeFragments; + } + + /* ------------------------------------------------------------ */ + /** + * @param fakeFragments True if fake fragments should be created for frames larger than the buffer. + */ + public void setFakeFragments(boolean fakeFragments) + { + _fakeFragments = fakeFragments; + } + /* ------------------------------------------------------------ */ public boolean isBufferEmpty() { @@ -120,7 +138,33 @@ public class WebSocketParserD10 implements WebSocketParser // if no space, then the data is too big for buffer if (_buffer.space() == 0) + { + // Can we send a fake frame? + if (_fakeFragments && _state==State.DATA) + { + Buffer data =_buffer.get(4*(available/4)); + _buffer.compact(); + if (_masked) + { + if (data.array()==null) + data=_buffer.asMutableBuffer(); + byte[] array = data.array(); + final int end=data.putIndex(); + for (int i=data.getIndex();i>\n",TypeUtil.toHexString(_flags),TypeUtil.toHexString(_opcode),data.length()); + events++; + _bytesNeeded-=data.length(); + _handler.onFrame((byte)(_flags&(0xff^WebSocketConnectionD10.FLAG_FIN)), _opcode, data); + + _opcode=WebSocketConnectionD10.OP_CONTINUATION; + } + + if (_buffer.space() == 0) throw new IllegalStateException("FULL: "+_state+" "+_bytesNeeded+">"+_buffer.capacity()); + } // catch IOExceptions (probably EOF) and try to parse what we have try @@ -200,7 +244,7 @@ public class WebSocketParserD10 implements WebSocketParser _length = _length*0x100 + (0xff&b); if (--_bytesNeeded==0) { - if (_length>_buffer.capacity()) + if (_length>_buffer.capacity() && !_fakeFragments) { events++; _handler.close(WebSocketConnectionD10.CLOSE_LARGE,"frame size "+_length+">"+_buffer.capacity()); diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD10Test.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD10Test.java index a08d3825574..3990fc06d9b 100644 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD10Test.java +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD10Test.java @@ -24,7 +24,7 @@ public class WebSocketParserD10Test { private MaskedByteArrayBuffer _in; private Handler _handler; - private WebSocketParser _parser; + private WebSocketParserD10 _parser; private byte[] _mask = new byte[] {(byte)0x00,(byte)0xF0,(byte)0x0F,(byte)0xFF}; private int _m; @@ -88,6 +88,7 @@ public class WebSocketParserD10Test endPoint.setNonBlocking(true); _handler = new Handler(); _parser=new WebSocketParserD10(buffers, endPoint,_handler,true); + _parser.setFakeFragments(false); _in = new MaskedByteArrayBuffer(); endPoint.setIn(_in); @@ -250,6 +251,7 @@ public class WebSocketParserD10Test public void testFrameTooLarge() throws Exception { // Buffers are only 1024, so this frame is too large + _parser.setFakeFragments(false); _in.putUnmasked((byte)0x81); _in.putUnmasked((byte)(0x80|0x7E)); @@ -287,6 +289,27 @@ public class WebSocketParserD10Test assertEquals(1024,_handler._data.get(0).length()); } + @Test + public void testFakeFragement() throws Exception + { + // Buffers are only 1024, so this frame will be fake fragmented + _parser.setFakeFragments(true); + + _in.putUnmasked((byte)0x81); + _in.putUnmasked((byte)(0x80|0x7E)); + _in.putUnmasked((byte)(2048>>8)); + _in.putUnmasked((byte)(2048&0xff)); + _in.sendMask(); + for (int i=0;i<2048;i++) + _in.put((byte)'a'); + + int progress =_parser.parseNext(); + assertTrue(progress>0); + + assertEquals(2,_handler._frames); + assertEquals(WebSocketConnectionD10.OP_CONTINUATION,_handler._opcode); + } + private class Handler implements WebSocketParser.FrameHandler { Utf8StringBuilder _utf8 = new Utf8StringBuilder(); @@ -295,9 +318,11 @@ public class WebSocketParserD10Test private byte _opcode; int _code; String _message; + int _frames; public void onFrame(byte flags, byte opcode, Buffer buffer) { + _frames++; _flags=flags; _opcode=opcode; if ((flags&0x8)==0)