Fixes #307898 (websocket does not handle messages larger than a buffer).

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@1436 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Simone Bordet 2010-04-01 17:20:28 +00:00
parent 012b1e7c22
commit 6fdd5c9677
2 changed files with 271 additions and 107 deletions

View File

@ -1,6 +1,7 @@
package org.eclipse.jetty.websocket;
import java.io.IOException;
import java.math.BigInteger;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.EndPoint;
@ -9,8 +10,8 @@ import org.eclipse.jetty.io.EndPoint;
/* ------------------------------------------------------------ */
/** WebSocketGenerator.
* This class generates websocket packets.
* It is fully synchronized because it is likely that async
* threads will call the addMessage methods while other
* It is fully synchronized because it is likely that async
* threads will call the addMessage methods while other
* threads are flushing the generator.
*/
public class WebSocketGenerator
@ -18,135 +19,95 @@ public class WebSocketGenerator
final private WebSocketBuffers _buffers;
final private EndPoint _endp;
private Buffer _buffer;
public WebSocketGenerator(WebSocketBuffers buffers, EndPoint endp)
{
_buffers=buffers;
_endp=endp;
}
synchronized public void addFrame(byte frame,byte[] content, int blockFor) throws IOException
public synchronized void addFrame(byte frame,byte[] content, int blockFor) throws IOException
{
addFrame(frame,content,0,content.length,blockFor);
}
synchronized public void addFrame(byte frame,byte[] content, int offset, int length, int blockFor) throws IOException
public synchronized void addFrame(byte frame,byte[] content, int offset, int length, int blockFor) throws IOException
{
if (_buffer==null)
_buffer=_buffers.getDirectBuffer();
if ((frame&0x80)==0x80)
if (_buffer.space() == 0)
expelBuffer(blockFor);
if ((frame & WebSocket.LENGTH_FRAME) == WebSocket.LENGTH_FRAME)
{
// Send in a length delimited frame
// maximum of 3 byte length == 21 bits
if (length>2097152)
throw new IllegalArgumentException("too big");
int length_bytes=(length>16384)?3:(length>128)?2:1;
int needed=length+1+length_bytes;
checkSpace(needed,blockFor);
// Send a length delimited frame
_buffer.put(frame);
if (_buffer.space() == 0)
expelBuffer(blockFor);
switch (length_bytes)
// How many bytes we need for the length ?
// We have 7 bits available, so log2(length) / 7 + 1
// For example, 50000 bytes is 2 8-bytes: 11000011 01010000
// but we need to write it in 3 7-bytes 0000011 0000110 1010000
// 65536 == 1 00000000 00000000 => 100 0000000 0000000
int lengthBytes = new BigInteger(String.valueOf(length)).bitLength() / 7 + 1;
for (int i = lengthBytes - 1; i >= 0; --i)
{
case 3:
_buffer.put((byte)(0x80|(length>>14)));
case 2:
_buffer.put((byte)(0x80|(0x7f&(length>>7))));
case 1:
_buffer.put((byte)(0x7f&length));
byte lengthByte = (byte)(0x80 | (0x7F & (length >> 7 * i)));
_buffer.put(lengthByte);
if (_buffer.space() == 0)
expelBuffer(blockFor);
}
_buffer.put(content,offset,length);
}
else
{
// send in a sentinel frame
int needed=length+2;
checkSpace(needed,blockFor);
_buffer.put(frame);
_buffer.put(content,offset,length);
_buffer.put((byte)0xFF);
}
}
synchronized public void addFrame(byte frame, String content, int blockFor) throws IOException
{
Buffer byte_buffer=_buffers.getBuffer();
try
if (_buffer.space() == 0)
expelBuffer(blockFor);
int remaining = length;
while (remaining > 0)
{
byte[] array=byte_buffer.array();
int chars = content.length();
int bytes = 0;
final int limit=array.length-6;
for (int i = 0; i < chars; i++)
int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
_buffer.put(content, offset + (length - remaining), chunk);
remaining -= chunk;
if (_buffer.space() > 0)
{
int code = content.charAt(i);
if (bytes>=limit)
throw new IllegalArgumentException("frame too large");
if ((code & 0xffffff80) == 0)
if (frame == WebSocket.SENTINEL_FRAME)
_buffer.put((byte)0xFF);
// Gently flush the data, issuing a non-blocking write
flushBuffer();
}
else
{
// Forcibly flush the data, issuing a blocking write
expelBuffer(blockFor);
if (remaining == 0)
{
array[bytes++]=(byte)(code);
}
else if((code&0xfffff800)==0)
{
array[bytes++]=(byte)(0xc0|(code>>6));
array[bytes++]=(byte)(0x80|(code&0x3f));
}
else if((code&0xffff0000)==0)
{
array[bytes++]=(byte)(0xe0|(code>>12));
array[bytes++]=(byte)(0x80|((code>>6)&0x3f));
array[bytes++]=(byte)(0x80|(code&0x3f));
}
else if((code&0xff200000)==0)
{
array[bytes++]=(byte)(0xf0|(code>>18));
array[bytes++]=(byte)(0x80|((code>>12)&0x3f));
array[bytes++]=(byte)(0x80|((code>>6)&0x3f));
array[bytes++]=(byte)(0x80|(code&0x3f));
}
else if((code&0xf4000000)==0)
{
array[bytes++]=(byte)(0xf8|(code>>24));
array[bytes++]=(byte)(0x80|((code>>18)&0x3f));
array[bytes++]=(byte)(0x80|((code>>12)&0x3f));
array[bytes++]=(byte)(0x80|((code>>6)&0x3f));
array[bytes++]=(byte)(0x80|(code&0x3f));
}
else if((code&0x80000000)==0)
{
array[bytes++]=(byte)(0xfc|(code>>30));
array[bytes++]=(byte)(0x80|((code>>24)&0x3f));
array[bytes++]=(byte)(0x80|((code>>18)&0x3f));
array[bytes++]=(byte)(0x80|((code>>12)&0x3f));
array[bytes++]=(byte)(0x80|((code>>6)&0x3f));
array[bytes++]=(byte)(0x80|(code&0x3f));
}
else
{
array[bytes++]=(byte)('?');
if (frame == WebSocket.SENTINEL_FRAME)
_buffer.put((byte)0xFF);
// Gently flush the data, issuing a non-blocking write
flushBuffer();
}
}
addFrame(frame,array,0,bytes,blockFor);
}
finally
{
_buffers.returnBuffer(byte_buffer);
}
}
private void checkSpace(int needed, long blockFor)
public synchronized void addFrame(byte frame, String content, int blockFor) throws IOException
{
byte[] bytes = content.getBytes("UTF-8");
addFrame(frame, bytes, 0, bytes.length, blockFor);
}
private synchronized void checkSpace(int needed, long blockFor)
throws IOException
{
int space=_buffer.space();
if (space<needed)
{
if (_endp.isBlocking())
@ -175,7 +136,7 @@ public class WebSocketGenerator
space=_buffer.space();
}
}
if (space<needed)
{
_endp.close();
@ -184,12 +145,12 @@ public class WebSocketGenerator
}
}
synchronized public int flush(long blockFor)
public synchronized int flush(long blockFor)
{
return 0;
}
synchronized public int flush() throws IOException
public synchronized int flush() throws IOException
{
int flushed = flushBuffer();
if (_buffer!=null && _buffer.length()==0)
@ -199,12 +160,12 @@ public class WebSocketGenerator
}
return flushed;
}
private int flushBuffer() throws IOException
private synchronized int flushBuffer() throws IOException
{
if (!_endp.isOpen())
return -1;
throw new IOException("Closed");
if (_buffer!=null)
{
int flushed =_endp.flush(_buffer);
@ -215,7 +176,27 @@ public class WebSocketGenerator
return 0;
}
synchronized public boolean isBufferEmpty()
private synchronized void expelBuffer(long blockFor) throws IOException
{
flushBuffer();
_buffer.compact();
if (!_endp.isBlocking())
{
while (_buffer.space()==0)
{
// TODO: in case the I/O system signals write ready, but when we attempt to write we cannot
// TODO: we should decrease the blockFor timeout instead of waiting again the whole timeout
boolean ready = _endp.blockWritable(blockFor);
if (!ready)
throw new IOException("Write timeout");
flushBuffer();
_buffer.compact();
}
}
}
public synchronized boolean isBufferEmpty()
{
return _buffer==null || _buffer.length()==0;
}

View File

@ -0,0 +1,183 @@
package org.eclipse.jetty.websocket;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
import junit.framework.TestCase;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.StringUtil;
/**
* @version $Revision$ $Date$
*/
public class WebSocketMessageTest extends TestCase
{
private Server _server;
private Connector _connector;
private TestWebSocket _serverWebSocket;
@Override
protected void setUp() throws Exception
{
_server = new Server();
_connector = new SelectChannelConnector();
_server.addConnector(_connector);
WebSocketHandler wsHandler = new WebSocketHandler()
{
@Override
protected WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
return _serverWebSocket = new TestWebSocket();
}
};
wsHandler.setHandler(new DefaultHandler());
_server.setHandler(wsHandler);
_server.start();
}
@Override
protected void tearDown() throws Exception
{
_server.stop();
_server.join();
}
public void testServerSendBigStringMessage() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /test HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Upgrade: WebSocket\r\n" +
"Connection: Upgrade\r\n" +
"\r\n").getBytes("ISO-8859-1"));
output.flush();
InputStream input = socket.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "ISO-8859-1"));
String responseLine = reader.readLine();
assertTrue(responseLine.startsWith("HTTP/1.1 101 Web Socket Protocol Handshake"));
// Read until we find an empty line, which signals the end of the http response
String line;
while ((line = reader.readLine()) != null)
if (line.length() == 0)
break;
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.outbound);
// Server sends a big message
StringBuilder message = new StringBuilder();
String text = "0123456789ABCDEF";
for (int i = 0; i < 64 * 1024 / text.length(); ++i)
message.append(text);
_serverWebSocket.outbound.sendMessage(message.toString());
// Read until we get 0xFF
ByteArrayOutputStream baos = new ByteArrayOutputStream();
while (true)
{
int read = input.read();
baos.write(read);
if (read == 0xFF)
break;
}
baos.close();
byte[] bytes = baos.toByteArray();
String result = StringUtil.printable(bytes);
assertTrue(result.startsWith("0x00"));
assertTrue(result.endsWith("0xFF"));
assertEquals(message.length() + "0x00".length() + "0xFF".length(), result.length());
}
public void testServerSendBigBinaryMessage() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /test HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Upgrade: WebSocket\r\n" +
"Connection: Upgrade\r\n" +
"\r\n").getBytes("ISO-8859-1"));
output.flush();
InputStream input = socket.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "ISO-8859-1"));
String responseLine = reader.readLine();
assertTrue(responseLine.startsWith("HTTP/1.1 101 Web Socket Protocol Handshake"));
// Read until we find an empty line, which signals the end of the http response
String line;
while ((line = reader.readLine()) != null)
if (line.length() == 0)
break;
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.outbound);
// Server sends a big message
StringBuilder message = new StringBuilder();
String text = "0123456789ABCDEF";
for (int i = 0; i < 64 * 1024 / text.length(); ++i)
message.append(text);
byte[] data = message.toString().getBytes("UTF-8");
_serverWebSocket.outbound.sendMessage(WebSocket.LENGTH_FRAME, data);
// I know the format of the message will be: 0x80 0x84 0x80 0x80 ...
int frame = input.read();
assertEquals(0x80, frame);
int length1 = input.read();
assertEquals(0x84, length1);
int length2 = input.read();
assertEquals(0x80, length2);
int length3 = input.read();
assertEquals(0x80, length3);
int read = 0;
while (read < data.length)
{
int b = input.read();
assertTrue(b != -1);
++read;
}
}
private class TestWebSocket implements WebSocket
{
private final CountDownLatch latch = new CountDownLatch(1);
private volatile Outbound outbound;
public void onConnect(Outbound outbound)
{
this.outbound = outbound;
latch.countDown();
}
private boolean awaitConnected(long time) throws InterruptedException
{
return latch.await(time, TimeUnit.MILLISECONDS);
}
public void onMessage(byte frame, String data)
{
}
public void onMessage(byte frame, byte[] data, int offset, int length)
{
}
public void onDisconnect()
{
}
}
}