353073 Support fake fragements, so that frames may be larger than buffers

This commit is contained in:
Greg Wilkins 2011-08-11 13:26:12 +10:00
parent d63294bfe4
commit c07e14840c
6 changed files with 282 additions and 176 deletions

View File

@ -148,7 +148,10 @@ public interface WebSocket
byte textOpcode();
byte continuationOpcode();
byte finMask();
String getProtocol();
void setFakeFragments(boolean fake);
boolean isFakeFragments();
boolean isControl(byte opcode);
boolean isText(byte opcode);
boolean isBinary(byte opcode);

View File

@ -530,4 +530,16 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
{
return 0;
}
public void setFakeFragments(boolean fake)
{
// TODO Auto-generated method stub
}
public boolean isFakeFragments()
{
// TODO Auto-generated method stub
return false;
}
}

View File

@ -491,6 +491,15 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
{
return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
}
public void setFakeFragments(boolean fake)
{
}
public boolean isFakeFragments()
{
return false;
}
}
/* ------------------------------------------------------------ */

View File

@ -45,7 +45,8 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
final static byte OP_TEXT = 0x01;
final static byte OP_BINARY = 0x02;
final static byte OP_EXT_DATA = 0x03;
final static byte OP_CONTROL = 0x08;
final static byte OP_CLOSE = 0x08;
final static byte OP_PING = 0x09;
final static byte OP_PONG = 0x0A;
@ -60,16 +61,18 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
final static int CLOSE_NOCLOSE=1006;
final static int CLOSE_NOTUTF8=1007;
final static int FLAG_FIN=0x8;
final static int VERSION=8;
static boolean isLastFrame(byte flags)
{
return (flags&0x8)!=0;
return (flags&FLAG_FIN)!=0;
}
static boolean isControlFrame(byte opcode)
{
return (opcode&0x8)!=0;
return (opcode&OP_CONTROL)!=0;
}
private final static byte[] MAGIC;
@ -87,8 +90,8 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
private final String _protocol;
private final int _draft;
private int _close;
private boolean _closedIn;
private boolean _closedOut;
private volatile boolean _closedIn;
private volatile boolean _closedOut;
private int _maxTextMessageSize;
private int _maxBinaryMessageSize=-1;
@ -105,12 +108,12 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
}
}
private final WebSocketParser.FrameHandler _frameHandler= new FrameHandlerD07();
private final WebSocketParser.FrameHandler _frameHandler= new WSFrameHandler();
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private final WebSocket.FrameConnection _connection = new FrameConnectionD10();
private final WebSocket.FrameConnection _connection = new WSFrameConnection();
/* ------------------------------------------------------------ */
@ -341,7 +344,6 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
_close=code;
}
try
{
if (closed)
@ -360,7 +362,7 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
bytes[0]=(byte)(code/0x100);
bytes[1]=(byte)(code%0x100);
_outbound.addFrame((byte)0x8,WebSocketConnectionD10.OP_CLOSE,bytes,0,bytes.length);
_outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD10.OP_CLOSE,bytes,0,bytes.length);
}
_outbound.flush();
@ -390,29 +392,29 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private class FrameConnectionD10 implements WebSocket.FrameConnection
private class WSFrameConnection implements WebSocket.FrameConnection
{
volatile boolean _disconnecting;
int _maxTextMessage=WebSocketConnectionD10.this._maxTextMessageSize;
int _maxBinaryMessage=WebSocketConnectionD10.this._maxBinaryMessageSize;
/* ------------------------------------------------------------ */
public synchronized void sendMessage(String content) throws IOException
public void sendMessage(String content) throws IOException
{
if (_closedOut)
throw new IOException("closing");
byte[] data = content.getBytes(StringUtil.__UTF8);
_outbound.addFrame((byte)0x8,WebSocketConnectionD10.OP_TEXT,data,0,data.length);
_outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD10.OP_TEXT,data,0,data.length);
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException
public void sendMessage(byte[] content, int offset, int length) throws IOException
{
if (_closedOut)
throw new IOException("closing");
_outbound.addFrame((byte)0x8,WebSocketConnectionD10.OP_BINARY,content,offset,length);
_outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD10.OP_BINARY,content,offset,length);
checkWriteable();
_idle.access(_endp);
}
@ -432,7 +434,7 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
{
if (_closedOut)
throw new IOException("closing");
_outbound.addFrame((byte)0x8,ctrl,data,offset,length);
_outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length);
checkWriteable();
_idle.access(_endp);
}
@ -509,7 +511,7 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
/* ------------------------------------------------------------ */
public byte finMask()
{
return 0x8;
return FLAG_FIN;
}
/* ------------------------------------------------------------ */
@ -559,6 +561,18 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
{
close(CLOSE_NORMAL,null);
}
/* ------------------------------------------------------------ */
public void setFakeFragments(boolean fake)
{
_parser.setFakeFragments(fake);
}
/* ------------------------------------------------------------ */
public boolean isFakeFragments()
{
return _parser.isFakeFragments();
}
/* ------------------------------------------------------------ */
public String toString()
@ -570,7 +584,7 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private class FrameHandlerD07 implements WebSocketParser.FrameHandler
private class WSFrameHandler implements WebSocketParser.FrameHandler
{
private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
private ByteArrayBuffer _aggregate;
@ -585,175 +599,174 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
// Ignore incoming after a close
if (_closedIn)
return;
try
{
byte[] array=buffer.array();
}
try
{
byte[] array=buffer.array();
// Deliver frame if websocket is a FrameWebSocket
if (_onFrame!=null)
// Deliver frame if websocket is a FrameWebSocket
if (_onFrame!=null)
{
if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
return;
}
if (_onControl!=null && isControlFrame(opcode))
{
if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
return;
}
switch(opcode)
{
case WebSocketConnectionD10.OP_CONTINUATION:
{
if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
return;
}
if (_onControl!=null && isControlFrame(opcode))
{
if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
return;
}
switch(opcode)
{
case WebSocketConnectionD10.OP_CONTINUATION:
// If text, append to the message buffer
if (_opcode==WebSocketConnectionD10.OP_TEXT && _connection.getMaxTextMessageSize()>=0)
{
// If text, append to the message buffer
if (_opcode==WebSocketConnectionD10.OP_TEXT && _connection.getMaxTextMessageSize()>=0)
if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
{
if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
// If this is the last fragment, deliver the text buffer
if (lastFrame && _onTextMessage!=null)
{
// If this is the last fragment, deliver the text buffer
if (lastFrame && _onTextMessage!=null)
_opcode=-1;
String msg =_utf8.toString();
_utf8.reset();
_onTextMessage.onMessage(msg);
}
}
else
{
_connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
_utf8.reset();
_opcode=-1;
}
}
else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
{
if (_aggregate.space()<_aggregate.length())
{
_connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
_aggregate.clear();
_opcode=-1;
}
else
{
_aggregate.put(buffer);
// If this is the last fragment, deliver
if (lastFrame && _onBinaryMessage!=null)
{
try
{
_onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
}
finally
{
_opcode=-1;
String msg =_utf8.toString();
_utf8.reset();
_onTextMessage.onMessage(msg);
}
}
else
{
_connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
_utf8.reset();
_opcode=-1;
}
}
else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
{
if (_aggregate.space()<_aggregate.length())
{
_connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
_aggregate.clear();
_opcode=-1;
}
else
{
_aggregate.put(buffer);
// If this is the last fragment, deliver
if (lastFrame && _onBinaryMessage!=null)
{
try
{
_onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
}
finally
{
_opcode=-1;
_aggregate.clear();
}
_aggregate.clear();
}
}
}
break;
}
case WebSocketConnectionD10.OP_PING:
{
Log.debug("PING {}",this);
if (!_closedOut)
_connection.sendControl(WebSocketConnectionD10.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
break;
}
case WebSocketConnectionD10.OP_PONG:
{
Log.debug("PONG {}",this);
break;
}
case WebSocketConnectionD10.OP_CLOSE:
{
int code=WebSocketConnectionD10.CLOSE_NOCODE;
String message=null;
if (buffer.length()>=2)
{
code=buffer.array()[buffer.getIndex()]*0x100+buffer.array()[buffer.getIndex()+1];
if (buffer.length()>2)
message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
}
closeIn(code,message);
break;
}
case WebSocketConnectionD10.OP_TEXT:
{
if(_onTextMessage!=null)
{
if (lastFrame)
{
// Deliver the message
_onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
}
else
{
if (_connection.getMaxTextMessageSize()>=0)
{
// If this is a text fragment, append to buffer
if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
_opcode=WebSocketConnectionD10.OP_TEXT;
else
{
_utf8.reset();
_opcode=-1;
_connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
}
}
}
}
break;
}
default:
{
if (_onBinaryMessage!=null)
{
if (lastFrame)
{
_onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
}
else
{
if (_connection.getMaxBinaryMessageSize()>=0)
{
if (buffer.length()>_connection.getMaxBinaryMessageSize())
{
_connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
if (_aggregate!=null)
_aggregate.clear();
_opcode=-1;
}
else
{
_opcode=opcode;
if (_aggregate==null)
_aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
_aggregate.put(buffer);
}
}
}
}
}
break;
}
case WebSocketConnectionD10.OP_PING:
{
Log.debug("PING {}",this);
if (!_closedOut)
_connection.sendControl(WebSocketConnectionD10.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
break;
}
case WebSocketConnectionD10.OP_PONG:
{
Log.debug("PONG {}",this);
break;
}
case WebSocketConnectionD10.OP_CLOSE:
{
int code=WebSocketConnectionD10.CLOSE_NOCODE;
String message=null;
if (buffer.length()>=2)
{
code=buffer.array()[buffer.getIndex()]*0x100+buffer.array()[buffer.getIndex()+1];
if (buffer.length()>2)
message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
}
closeIn(code,message);
break;
}
case WebSocketConnectionD10.OP_TEXT:
{
if(_onTextMessage!=null)
{
if (lastFrame)
{
// Deliver the message
_onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
}
else
{
if (_connection.getMaxTextMessageSize()>=0)
{
// If this is a text fragment, append to buffer
if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
_opcode=WebSocketConnectionD10.OP_TEXT;
else
{
_utf8.reset();
_opcode=-1;
_connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
}
}
}
}
break;
}
default:
{
if (_onBinaryMessage!=null)
{
if (lastFrame)
{
_onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
}
else
{
if (_connection.getMaxBinaryMessageSize()>=0)
{
if (buffer.length()>_connection.getMaxBinaryMessageSize())
{
_connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
if (_aggregate!=null)
_aggregate.clear();
_opcode=-1;
}
else
{
_opcode=opcode;
if (_aggregate==null)
_aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
_aggregate.put(buffer);
}
}
}
}
}
}
catch(ThreadDeath th)
{
throw th;
}
catch(Throwable th)
{
Log.warn(th);
}
}
catch(ThreadDeath th)
{
throw th;
}
catch(Throwable th)
{
Log.warn(th);
}
}

View File

@ -46,7 +46,6 @@ public class WebSocketParserD10 implements WebSocketParser
}
};
private final WebSocketBuffers _buffers;
private final EndPoint _endp;
private final FrameHandler _handler;
@ -61,6 +60,7 @@ public class WebSocketParserD10 implements WebSocketParser
private final byte[] _mask = new byte[4];
private int _m;
private boolean _skip;
private boolean _fakeFragments=true;
/* ------------------------------------------------------------ */
/**
@ -79,6 +79,24 @@ public class WebSocketParserD10 implements WebSocketParser
_state=State.START;
}
/* ------------------------------------------------------------ */
/**
* @return True if fake fragments should be created for frames larger than the buffer.
*/
public boolean isFakeFragments()
{
return _fakeFragments;
}
/* ------------------------------------------------------------ */
/**
* @param fakeFragments True if fake fragments should be created for frames larger than the buffer.
*/
public void setFakeFragments(boolean fakeFragments)
{
_fakeFragments = fakeFragments;
}
/* ------------------------------------------------------------ */
public boolean isBufferEmpty()
{
@ -120,7 +138,33 @@ public class WebSocketParserD10 implements WebSocketParser
// if no space, then the data is too big for buffer
if (_buffer.space() == 0)
{
// Can we send a fake frame?
if (_fakeFragments && _state==State.DATA)
{
Buffer data =_buffer.get(4*(available/4));
_buffer.compact();
if (_masked)
{
if (data.array()==null)
data=_buffer.asMutableBuffer();
byte[] array = data.array();
final int end=data.putIndex();
for (int i=data.getIndex();i<end;i++)
array[i]^=_mask[_m++%4];
}
// System.err.printf("%s %s %s >>\n",TypeUtil.toHexString(_flags),TypeUtil.toHexString(_opcode),data.length());
events++;
_bytesNeeded-=data.length();
_handler.onFrame((byte)(_flags&(0xff^WebSocketConnectionD10.FLAG_FIN)), _opcode, data);
_opcode=WebSocketConnectionD10.OP_CONTINUATION;
}
if (_buffer.space() == 0)
throw new IllegalStateException("FULL: "+_state+" "+_bytesNeeded+">"+_buffer.capacity());
}
// catch IOExceptions (probably EOF) and try to parse what we have
try
@ -200,7 +244,7 @@ public class WebSocketParserD10 implements WebSocketParser
_length = _length*0x100 + (0xff&b);
if (--_bytesNeeded==0)
{
if (_length>_buffer.capacity())
if (_length>_buffer.capacity() && !_fakeFragments)
{
events++;
_handler.close(WebSocketConnectionD10.CLOSE_LARGE,"frame size "+_length+">"+_buffer.capacity());

View File

@ -24,7 +24,7 @@ public class WebSocketParserD10Test
{
private MaskedByteArrayBuffer _in;
private Handler _handler;
private WebSocketParser _parser;
private WebSocketParserD10 _parser;
private byte[] _mask = new byte[] {(byte)0x00,(byte)0xF0,(byte)0x0F,(byte)0xFF};
private int _m;
@ -88,6 +88,7 @@ public class WebSocketParserD10Test
endPoint.setNonBlocking(true);
_handler = new Handler();
_parser=new WebSocketParserD10(buffers, endPoint,_handler,true);
_parser.setFakeFragments(false);
_in = new MaskedByteArrayBuffer();
endPoint.setIn(_in);
@ -250,6 +251,7 @@ public class WebSocketParserD10Test
public void testFrameTooLarge() throws Exception
{
// Buffers are only 1024, so this frame is too large
_parser.setFakeFragments(false);
_in.putUnmasked((byte)0x81);
_in.putUnmasked((byte)(0x80|0x7E));
@ -287,6 +289,27 @@ public class WebSocketParserD10Test
assertEquals(1024,_handler._data.get(0).length());
}
@Test
public void testFakeFragement() throws Exception
{
// Buffers are only 1024, so this frame will be fake fragmented
_parser.setFakeFragments(true);
_in.putUnmasked((byte)0x81);
_in.putUnmasked((byte)(0x80|0x7E));
_in.putUnmasked((byte)(2048>>8));
_in.putUnmasked((byte)(2048&0xff));
_in.sendMask();
for (int i=0;i<2048;i++)
_in.put((byte)'a');
int progress =_parser.parseNext();
assertTrue(progress>0);
assertEquals(2,_handler._frames);
assertEquals(WebSocketConnectionD10.OP_CONTINUATION,_handler._opcode);
}
private class Handler implements WebSocketParser.FrameHandler
{
Utf8StringBuilder _utf8 = new Utf8StringBuilder();
@ -295,9 +318,11 @@ public class WebSocketParserD10Test
private byte _opcode;
int _code;
String _message;
int _frames;
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
_frames++;
_flags=flags;
_opcode=opcode;
if ((flags&0x8)==0)