353073 too large message handling

This commit is contained in:
Greg Wilkins 2011-08-12 14:23:51 +10:00
parent 365bc6bb67
commit 11d89c9e3f
1 changed files with 58 additions and 50 deletions

View File

@ -89,6 +89,7 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
private final OnControl _onControl; private final OnControl _onControl;
private final String _protocol; private final String _protocol;
private final int _draft; private final int _draft;
private final ClassLoader _context;
private int _close; private int _close;
private volatile boolean _closedIn; private volatile boolean _closedIn;
private volatile boolean _closedOut; private volatile boolean _closedOut;
@ -129,6 +130,8 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
{ {
super(endpoint,timestamp); super(endpoint,timestamp);
_context=Thread.currentThread().getContextClassLoader();
// TODO - can we use the endpoint idle mechanism? // TODO - can we use the endpoint idle mechanism?
if (endpoint instanceof AsyncEndPoint) if (endpoint instanceof AsyncEndPoint)
((AsyncEndPoint)endpoint).cancelIdle(); ((AsyncEndPoint)endpoint).cancelIdle();
@ -209,6 +212,9 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public Connection handle() throws IOException public Connection handle() throws IOException
{ {
Thread current = Thread.currentThread();
ClassLoader oldcontext = current.getContextClassLoader();
current.setContextClassLoader(_context);
try try
{ {
// handle the framing protocol // handle the framing protocol
@ -242,6 +248,7 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
} }
finally finally
{ {
current.setContextClassLoader(oldcontext);
if (_endp.isOpen()) if (_endp.isOpen())
{ {
_generator.idle(); _generator.idle();
@ -590,7 +597,7 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
private ByteArrayBuffer _aggregate; private ByteArrayBuffer _aggregate;
private byte _opcode=-1; private byte _opcode=-1;
public void onFrame(byte flags, byte opcode, Buffer buffer) public void onFrame(final byte flags, final byte opcode, final Buffer buffer)
{ {
boolean lastFrame = isLastFrame(flags); boolean lastFrame = isLastFrame(flags);
@ -622,12 +629,12 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
case WebSocketConnectionD10.OP_CONTINUATION: case WebSocketConnectionD10.OP_CONTINUATION:
{ {
// If text, append to the message buffer // If text, append to the message buffer
if (_opcode==WebSocketConnectionD10.OP_TEXT && _connection.getMaxTextMessageSize()>=0) if (_onTextMessage!=null && _opcode==WebSocketConnectionD10.OP_TEXT)
{ {
if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize())) if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
{ {
// If this is the last fragment, deliver the text buffer // If this is the last fragment, deliver the text buffer
if (lastFrame && _onTextMessage!=null) if (lastFrame)
{ {
_opcode=-1; _opcode=-1;
String msg =_utf8.toString(); String msg =_utf8.toString();
@ -636,21 +643,12 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
} }
} }
else else
{ textMessageTooLarge();
_connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
_utf8.reset();
_opcode=-1;
} }
}
else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0) if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
{ {
if (_aggregate.space()<_aggregate.length()) if (checkBinaryMessageSize(_aggregate.length(),buffer.length()))
{
_connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
_aggregate.clear();
_opcode=-1;
}
else
{ {
_aggregate.put(buffer); _aggregate.put(buffer);
@ -699,57 +697,47 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
break; break;
} }
case WebSocketConnectionD10.OP_TEXT: case WebSocketConnectionD10.OP_TEXT:
{ {
if(_onTextMessage!=null) if(_onTextMessage!=null)
{ {
if (lastFrame) if (_connection.getMaxTextMessageSize()<=0)
{ {
// Deliver the message // No size limit, so handle only final frames
if (lastFrame)
_onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8)); _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
} }
else else if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
{ {
if (_connection.getMaxTextMessageSize()>=0) if (lastFrame)
{
// If this is a text fragment, append to buffer
if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
_opcode=WebSocketConnectionD10.OP_TEXT;
else
{ {
String msg =_utf8.toString();
_utf8.reset(); _utf8.reset();
_opcode=-1; _onTextMessage.onMessage(msg);
_connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars"); }
} else
{
_opcode=WebSocketConnectionD10.OP_TEXT;
} }
} }
else
textMessageTooLarge();
} }
break; break;
} }
default: default:
{ {
if (_onBinaryMessage!=null) if (_onBinaryMessage!=null && checkBinaryMessageSize(0,buffer.length()))
{ {
if (lastFrame) if (lastFrame)
{ {
_onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length()); _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
} }
else else if (_connection.getMaxBinaryMessageSize()>=0)
{
if (_connection.getMaxBinaryMessageSize()>=0)
{
if (buffer.length()>_connection.getMaxBinaryMessageSize())
{
_connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
if (_aggregate!=null)
_aggregate.clear();
_opcode=-1;
}
else
{ {
_opcode=opcode; _opcode=opcode;
// TODO use a growing buffer rather than a fixed one.
if (_aggregate==null) if (_aggregate==null)
_aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize()); _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
_aggregate.put(buffer); _aggregate.put(buffer);
@ -758,8 +746,6 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
} }
} }
} }
}
}
catch(ThreadDeath th) catch(ThreadDeath th)
{ {
throw th; throw th;
@ -770,6 +756,28 @@ public class WebSocketConnectionD10 extends AbstractConnection implements WebSoc
} }
} }
private boolean checkBinaryMessageSize(int bufferLen, int length)
{
int max = _connection.getMaxBinaryMessageSize();
if (max>0 && (bufferLen+length)>max)
{
_connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
_opcode=-1;
if (_aggregate!=null)
_aggregate.clear();
return false;
}
return true;
}
private void textMessageTooLarge()
{
_connection.close(WebSocketConnectionD10.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
_opcode=-1;
_utf8.reset();
}
public void close(int code,String message) public void close(int code,String message)
{ {
if (code!=CLOSE_NORMAL) if (code!=CLOSE_NORMAL)