344059 Websockets draft-07

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@3035 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Greg Wilkins 2011-04-27 23:27:56 +00:00
parent 7227f8c6bb
commit 3e35b74651
10 changed files with 2708 additions and 21 deletions

View File

@ -1,5 +1,4 @@
jetty-7.4.1-SNAPSHOT
+ JETTY-954 WebAppContext eats any start exceptions instead of stopping the server load
+ 343083 Set nested dispatch type and connection
+ 343277 add support for a context white list
+ 343352 make sure that jetty.osgi.boot is activated when a WAB is registered
@ -7,6 +6,8 @@ jetty-7.4.1-SNAPSHOT
+ 343707 'REQUEST' is printed on console for each incoming HTTP request
+ 343482 refactored overlay deployer layout to use WAR layout
+ 343923 flush timeouts applied to outer loop
+ 344059 Websockets draft-07
+ JETTY-954 WebAppContext eats any start exceptions instead of stopping the server load
jetty-7.4.0.v20110414
+ 342504 Scanner Listener

View File

@ -37,8 +37,8 @@ public class TestClient
private final BufferedWriter _output;
private final BufferedReader _input;
private final SocketEndPoint _endp;
private final WebSocketGeneratorD06 _generator;
private final WebSocketParserD06 _parser;
private final WebSocketGeneratorD07 _generator;
private final WebSocketParserD07 _parser;
private int _framesSent;
private int _messagesSent;
private int _framesReceived;
@ -59,19 +59,19 @@ public class TestClient
{
_framesReceived++;
_frames++;
if (opcode == WebSocketConnectionD06.OP_CLOSE)
if (opcode == WebSocketConnectionD07.OP_CLOSE)
{
byte[] data=buffer.asArray();
// System.err.println("CLOSED: "+((0xff&data[0])*0x100+(0xff&data[1]))+" "+new String(data,2,data.length-2,StringUtil.__UTF8));
_generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_CLOSE,data,0,data.length,_socket.getSoTimeout());
_generator.addFrame((byte)0x8,WebSocketConnectionD07.OP_CLOSE,data,0,data.length,_socket.getSoTimeout());
_generator.flush(_socket.getSoTimeout());
_socket.shutdownOutput();
_socket.close();
return;
}
else if (opcode == WebSocketConnectionD06.OP_PING)
else if (opcode == WebSocketConnectionD07.OP_PING)
{
_generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length(),_socket.getSoTimeout());
_generator.addFrame((byte)0x8,WebSocketConnectionD07.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length(),_socket.getSoTimeout());
_generator.flush(_socket.getSoTimeout());
}
@ -81,12 +81,11 @@ public class TestClient
_opcode=opcode;
if (WebSocketConnectionD06.isLastFrame(flags))
if (WebSocketConnectionD07.isLastFrame(flags))
{
_messagesReceived++;
Long start=_starts.take();
long duration = System.nanoTime()-start.longValue();
if (duration>_maxDuration)
_maxDuration=duration;
@ -124,8 +123,8 @@ public class TestClient
_input = new BufferedReader(new InputStreamReader(_socket.getInputStream(), "ISO-8859-1"));
_endp=new SocketEndPoint(_socket);
_generator = new WebSocketGeneratorD06(new WebSocketBuffers(32*1024),_endp,new WebSocketGeneratorD06.FixedMaskGen(new byte[4]));
_parser = new WebSocketParserD06(new WebSocketBuffers(32*1024),_endp,_handler,false);
_generator = new WebSocketGeneratorD07(new WebSocketBuffers(32*1024),_endp,new WebSocketGeneratorD07.FixedMaskGen(new byte[4]));
_parser = new WebSocketParserD07(new WebSocketBuffers(32*1024),_endp,_handler,false);
}
public int getSize()
@ -153,7 +152,7 @@ public class TestClient
"Sec-WebSocket-Key: "+new String(B64Code.encode(key))+"\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: "+_protocol+"\r\n" +
"Sec-WebSocket-Version: 6\r\n"+
"Sec-WebSocket-Version: 7\r\n"+
"\r\n");
_output.flush();
@ -171,7 +170,7 @@ public class TestClient
if (line.startsWith("Sec-WebSocket-Accept:"))
{
String accept=line.substring(21).trim();
accepted=accept.equals(WebSocketConnectionD06.hashKey(new String(B64Code.encode(key))));
accepted=accept.equals(WebSocketConnectionD07.hashKey(new String(B64Code.encode(key))));
}
else if (line.startsWith("Sec-WebSocket-Protocol:"))
{
@ -206,7 +205,7 @@ public class TestClient
break;
byte data[]=null;
if (opcode==WebSocketConnectionD06.OP_TEXT)
if (opcode==WebSocketConnectionD07.OP_TEXT)
{
StringBuilder b = new StringBuilder();
while (b.length()<_size)
@ -229,7 +228,7 @@ public class TestClient
{
_framesSent++;
byte flags= (byte)(off+len==data.length?0x8:0);
byte op=(byte)(off==0?opcode:WebSocketConnectionD06.OP_CONTINUATION);
byte op=(byte)(off==0?opcode:WebSocketConnectionD07.OP_CONTINUATION);
if (_verbose)
System.err.printf("%s#addFrame %s|%s %s\n",this.getClass().getSimpleName(),TypeUtil.toHexString(flags),TypeUtil.toHexString(op),TypeUtil.toHexString(data,off,len));
@ -324,16 +323,16 @@ public class TestClient
}
TestClient client = new TestClient(host,port,protocol==null?null:("org.ietf.websocket.test-"+protocol),10000);
TestClient client = new TestClient(host,port,protocol==null?null:protocol,10000);
client.setSize(size);
try
{
client.open();
if (protocol!=null && protocol.startsWith("echo"))
client.ping(count,binary?WebSocketConnectionD06.OP_BINARY:WebSocketConnectionD06.OP_TEXT,fragment);
client.ping(count,binary?WebSocketConnectionD07.OP_BINARY:WebSocketConnectionD07.OP_TEXT,fragment);
else
client.ping(count,WebSocketConnectionD06.OP_PING,-1);
client.ping(count,WebSocketConnectionD07.OP_PING,-1);
}
finally
{

View File

@ -36,16 +36,16 @@ public class TestServer extends Server
{
_websocket = new TestEchoWebSocket();
}
else if ("org.ietf.websocket.test-echo-broadcast".equals(protocol))
else if ("org.ietf.websocket.test-echo-broadcast".equals(protocol) || "echo-broadcast".equals(protocol))
{
_websocket = new TestEchoBroadcastWebSocket();
}
else if ("org.ietf.websocket.test-echo-assemble".equals(protocol))
else if ("org.ietf.websocket.test-echo-assemble".equals(protocol) || "echo-assemble".equals(protocol))
{
_websocket = new TestEchoAssembleWebSocket();
}
else if ("org.ietf.websocket.test-echo-fragment".equals(protocol))
else if ("org.ietf.websocket.test-echo-fragment".equals(protocol) || "echo-fragment".equals(protocol))
{
_websocket = new TestEchoFragmentWebSocket();
}

View File

@ -0,0 +1,721 @@
// ========================================================================
// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.websocket;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.security.MessageDigest;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
import org.eclipse.jetty.websocket.WebSocket.OnControl;
public class WebSocketConnectionD07 extends AbstractConnection implements WebSocketConnection
{
final static byte OP_CONTINUATION = 0x00;
final static byte OP_TEXT = 0x01;
final static byte OP_BINARY = 0x02;
final static byte OP_CLOSE = 0x08;
final static byte OP_PING = 0x09;
final static byte OP_PONG = 0x0A;
final static int CLOSE_NORMAL=1000;
final static int CLOSE_SHUTDOWN=1001;
final static int CLOSE_PROTOCOL=1002;
final static int CLOSE_BADDATA=1003;
final static int CLOSE_LARGE=1004;
static boolean isLastFrame(byte flags)
{
return (flags&0x8)!=0;
}
static boolean isControlFrame(byte opcode)
{
return (opcode&0x8)!=0;
}
private final static byte[] MAGIC;
private final IdleCheck _idle;
private final WebSocketParser _parser;
private final WebSocketGenerator _generator;
private final WebSocket _webSocket;
private final OnFrame _onFrame;
private final OnBinaryMessage _onBinaryMessage;
private final OnTextMessage _onTextMessage;
private final OnControl _onControl;
private final String _protocol;
private boolean _closedIn;
private boolean _closedOut;
private int _maxTextMessageSize;
private int _maxBinaryMessageSize=-1;
static
{
try
{
MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1);
}
catch (UnsupportedEncodingException e)
{
throw new RuntimeException(e);
}
}
private final WebSocketParser.FrameHandler _frameHandler= new FrameHandlerD07();
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private final WebSocket.FrameConnection _connection = new FrameConnectionD07();
/* ------------------------------------------------------------ */
public WebSocketConnectionD07(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol)
throws IOException
{
super(endpoint,timestamp);
// TODO - can we use the endpoint idle mechanism?
if (endpoint instanceof AsyncEndPoint)
((AsyncEndPoint)endpoint).cancelIdle();
_endp.setMaxIdleTime(maxIdleTime);
_webSocket = websocket;
_onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
_onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
_onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
_onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
_generator = new WebSocketGeneratorD07(buffers, _endp,null);
_parser = new WebSocketParserD07(buffers, endpoint, _frameHandler,true);
_protocol=protocol;
// TODO should these be AsyncEndPoint checks/calls?
if (_endp instanceof SelectChannelEndPoint)
{
final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp;
scep.cancelIdle();
_idle=new IdleCheck()
{
public void access(EndPoint endp)
{
scep.scheduleIdle();
}
};
scep.scheduleIdle();
}
else
{
_idle = new IdleCheck()
{
public void access(EndPoint endp)
{}
};
}
_maxTextMessageSize=buffers.getBufferSize();
_maxBinaryMessageSize=-1;
}
/* ------------------------------------------------------------ */
public WebSocket.Connection getConnection()
{
return _connection;
}
/* ------------------------------------------------------------ */
public Connection handle() throws IOException
{
try
{
// handle the framing protocol
boolean progress=true;
while (progress)
{
int flushed=_generator.flush();
int filled=_parser.parseNext();
progress = flushed>0 || filled>0;
if (filled<0 || flushed<0)
{
_endp.close();
break;
}
}
}
catch(IOException e)
{
try
{
_endp.close();
}
catch(IOException e2)
{
Log.ignore(e2);
}
throw e;
}
finally
{
if (_endp.isOpen())
{
_idle.access(_endp);
if (_closedIn && _closedOut && _generator.isBufferEmpty())
_endp.close();
else if (_endp.isInputShutdown() && !_closedIn)
closeIn(CLOSE_PROTOCOL,null);
else
checkWriteable();
}
}
return this;
}
/* ------------------------------------------------------------ */
public boolean isIdle()
{
return _parser.isBufferEmpty() && _generator.isBufferEmpty();
}
/* ------------------------------------------------------------ */
@Override
public void idleExpired()
{
closeOut(WebSocketConnectionD07.CLOSE_NORMAL,"Idle");
}
/* ------------------------------------------------------------ */
public boolean isSuspended()
{
return false;
}
/* ------------------------------------------------------------ */
public void closed()
{
_webSocket.onClose(WebSocketConnectionD07.CLOSE_NORMAL,"");
}
/* ------------------------------------------------------------ */
public synchronized void closeIn(int code,String message)
{
Log.debug("ClosedIn {} {}",this,message);
try
{
if (_closedOut)
_endp.close();
else
closeOut(code,message);
}
catch(IOException e)
{
Log.ignore(e);
}
finally
{
_closedIn=true;
}
}
/* ------------------------------------------------------------ */
public synchronized void closeOut(int code,String message)
{
Log.debug("ClosedOut {} {}",this,message);
try
{
if (_closedIn || _closedOut)
_endp.close();
else
{
if (code<=0)
code=WebSocketConnectionD07.CLOSE_NORMAL;
byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
bytes[0]=(byte)(code/0x100);
bytes[1]=(byte)(code%0x100);
_generator.addFrame((byte)0x8,WebSocketConnectionD07.OP_CLOSE,bytes,0,bytes.length,_endp.getMaxIdleTime());
}
_generator.flush();
}
catch(IOException e)
{
Log.ignore(e);
}
finally
{
_closedOut=true;
}
}
/* ------------------------------------------------------------ */
public void fillBuffersFrom(Buffer buffer)
{
_parser.fill(buffer);
}
/* ------------------------------------------------------------ */
private void checkWriteable()
{
if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint)
{
((AsyncEndPoint)_endp).scheduleWrite();
}
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private class FrameConnectionD07 implements WebSocket.FrameConnection
{
volatile boolean _disconnecting;
int _maxTextMessage=WebSocketConnectionD07.this._maxTextMessageSize;
int _maxBinaryMessage=WebSocketConnectionD07.this._maxBinaryMessageSize;
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(byte, java.lang.String)
*/
public synchronized void sendMessage(String content) throws IOException
{
if (_closedOut)
throw new IOException("closing");
byte[] data = content.getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0x8,WebSocketConnectionD07.OP_TEXT,data,0,data.length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(byte, byte[], int, int)
*/
public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException
{
if (_closedOut)
throw new IOException("closing");
_generator.addFrame((byte)0x8,WebSocketConnectionD07.OP_BINARY,content,offset,length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.websocket.WebSocketConnection#sendFrame(boolean, byte, byte[], int, int)
*/
public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
{
if (_closedOut)
throw new IOException("closing");
_generator.addFrame(flags,opcode,content,offset,length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
public void sendControl(byte control, byte[] data, int offset, int length) throws IOException
{
if (_closedOut)
throw new IOException("closing");
_generator.addFrame((byte)0x8,control,data,offset,length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
public boolean isMessageComplete(byte flags)
{
return isLastFrame(flags);
}
/* ------------------------------------------------------------ */
public boolean isOpen()
{
return _endp!=null&&_endp.isOpen();
}
/* ------------------------------------------------------------ */
public void close(int code, String message)
{
if (_disconnecting)
return;
_disconnecting=true;
WebSocketConnectionD07.this.closeOut(code,message);
}
/* ------------------------------------------------------------ */
public void setMaxTextMessageSize(int size)
{
_maxTextMessage=size;
}
/* ------------------------------------------------------------ */
public void setMaxBinaryMessageSize(int size)
{
_maxBinaryMessage=size;
}
/* ------------------------------------------------------------ */
public int getMaxTextMessageSize()
{
return _maxTextMessage;
}
/* ------------------------------------------------------------ */
public int getMaxBinaryMessageSize()
{
return _maxBinaryMessage;
}
/* ------------------------------------------------------------ */
public String getProtocol()
{
return _protocol;
}
/* ------------------------------------------------------------ */
public byte binaryOpcode()
{
return OP_BINARY;
}
/* ------------------------------------------------------------ */
public byte textOpcode()
{
return OP_TEXT;
}
/* ------------------------------------------------------------ */
public boolean isControl(byte opcode)
{
return isControlFrame(opcode);
}
/* ------------------------------------------------------------ */
public boolean isText(byte opcode)
{
return opcode==OP_TEXT;
}
/* ------------------------------------------------------------ */
public boolean isBinary(byte opcode)
{
return opcode==OP_BINARY;
}
/* ------------------------------------------------------------ */
public boolean isContinuation(byte opcode)
{
return opcode==OP_CONTINUATION;
}
/* ------------------------------------------------------------ */
public boolean isClose(byte opcode)
{
return opcode==OP_CLOSE;
}
/* ------------------------------------------------------------ */
public boolean isPing(byte opcode)
{
return opcode==OP_PING;
}
/* ------------------------------------------------------------ */
public boolean isPong(byte opcode)
{
return opcode==OP_PONG;
}
/* ------------------------------------------------------------ */
public void disconnect()
{
close(CLOSE_NORMAL,null);
}
/* ------------------------------------------------------------ */
public String toString()
{
return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
}
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private class FrameHandlerD07 implements WebSocketParser.FrameHandler
{
private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
private ByteArrayBuffer _aggregate;
private byte _opcode=-1;
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
boolean lastFrame = isLastFrame(flags);
synchronized(WebSocketConnectionD07.this)
{
// Ignore incoming after a close
if (_closedIn)
return;
try
{
byte[] array=buffer.array();
// Deliver frame if websocket is a FrameWebSocket
if (_onFrame!=null)
{
if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
return;
}
if (_onControl!=null && isControlFrame(opcode))
{
if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
return;
}
switch(opcode)
{
case WebSocketConnectionD07.OP_CONTINUATION:
{
// If text, append to the message buffer
if (_opcode==WebSocketConnectionD07.OP_TEXT && _connection.getMaxTextMessageSize()>=0)
{
if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
{
// If this is the last fragment, deliver the text buffer
if (lastFrame && _onTextMessage!=null)
{
_opcode=-1;
String msg =_utf8.toString();
_utf8.reset();
_onTextMessage.onMessage(msg);
}
}
else
{
_connection.close(WebSocketConnectionD07.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
_utf8.reset();
_opcode=-1;
}
}
else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
{
if (_aggregate.space()<_aggregate.length())
{
_connection.close(WebSocketConnectionD07.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
_aggregate.clear();
_opcode=-1;
}
else
{
_aggregate.put(buffer);
// If this is the last fragment, deliver
if (lastFrame && _onBinaryMessage!=null)
{
try
{
_onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
}
finally
{
_opcode=-1;
_aggregate.clear();
}
}
}
}
break;
}
case WebSocketConnectionD07.OP_PING:
{
Log.debug("PING {}",this);
if (!_closedOut)
_connection.sendControl(WebSocketConnectionD07.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
break;
}
case WebSocketConnectionD07.OP_PONG:
{
Log.debug("PONG {}",this);
break;
}
case WebSocketConnectionD07.OP_CLOSE:
{
int code=-1;
String message=null;
if (buffer.length()>=2)
{
code=buffer.array()[buffer.getIndex()]*0xff+buffer.array()[buffer.getIndex()+1];
if (buffer.length()>2)
message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
}
closeIn(code,message);
break;
}
case WebSocketConnectionD07.OP_TEXT:
{
if(_onTextMessage!=null)
{
if (lastFrame)
{
// Deliver the message
_onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
}
else
{
if (_connection.getMaxTextMessageSize()>=0)
{
// If this is a text fragment, append to buffer
if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
_opcode=WebSocketConnectionD07.OP_TEXT;
else
{
_utf8.reset();
_opcode=-1;
_connection.close(WebSocketConnectionD07.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
}
}
}
}
break;
}
default:
{
if (_onBinaryMessage!=null)
{
if (lastFrame)
{
_onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
}
else
{
if (_connection.getMaxBinaryMessageSize()>=0)
{
if (buffer.length()>_connection.getMaxBinaryMessageSize())
{
_connection.close(WebSocketConnectionD07.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
if (_aggregate!=null)
_aggregate.clear();
_opcode=-1;
}
else
{
_opcode=opcode;
if (_aggregate==null)
_aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
_aggregate.put(buffer);
}
}
}
}
}
}
}
catch(ThreadDeath th)
{
throw th;
}
catch(Throwable th)
{
Log.warn(th);
}
}
}
public void close(int code,String message)
{
_connection.close(code,message);
}
public String toString()
{
return WebSocketConnectionD07.this.toString()+"FH";
}
}
/* ------------------------------------------------------------ */
private interface IdleCheck
{
void access(EndPoint endp);
}
/* ------------------------------------------------------------ */
public void handshake(HttpServletRequest request, HttpServletResponse response, String origin, String subprotocol) throws IOException
{
String uri=request.getRequestURI();
String query=request.getQueryString();
if (query!=null && query.length()>0)
uri+="?"+query;
String key = request.getHeader("Sec-WebSocket-Key");
response.setHeader("Upgrade","WebSocket");
response.addHeader("Connection","Upgrade");
response.addHeader("Sec-WebSocket-Accept",hashKey(key));
if (subprotocol!=null)
response.addHeader("Sec-WebSocket-Protocol",subprotocol);
response.sendError(101);
if (_onFrame!=null)
_onFrame.onHandshake(_connection);
_webSocket.onOpen(_connection);
}
/* ------------------------------------------------------------ */
public static String hashKey(String key)
{
try
{
MessageDigest md = MessageDigest.getInstance("SHA1");
md.update(key.getBytes("UTF-8"));
md.update(MAGIC);
return new String(B64Code.encode(md.digest()));
}
catch (Exception e)
{
throw new RuntimeException(e);
}
}
}

View File

@ -128,6 +128,9 @@ public class WebSocketFactory
case 6:
connection = new WebSocketConnectionD06(websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol);
break;
case 7:
connection = new WebSocketConnectionD07(websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol);
break;
default:
Log.warn("Unsupported Websocket version: "+draft);
throw new HttpException(400, "Unsupported draft specification: " + draft);

View File

@ -0,0 +1,278 @@
// ========================================================================
// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.websocket;
import java.io.IOException;
import java.security.SecureRandom;
import java.util.Random;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.TypeUtil;
/* ------------------------------------------------------------ */
/** WebSocketGenerator.
* This class generates websocket packets.
* It is fully synchronized because it is likely that async
* threads will call the addMessage methods while other
* threads are flushing the generator.
*/
public class WebSocketGeneratorD07 implements WebSocketGenerator
{
final private WebSocketBuffers _buffers;
final private EndPoint _endp;
private Buffer _buffer;
private final byte[] _mask=new byte[4];
private int _m;
private boolean _opsent;
private final MaskGen _maskGen;
public interface MaskGen
{
void genMask(byte[] mask);
}
public static class NullMaskGen implements MaskGen
{
public void genMask(byte[] mask)
{
mask[0]=mask[1]=mask[2]=mask[3]=0;
}
}
public static class FixedMaskGen implements MaskGen
{
final byte[] _mask;
public FixedMaskGen()
{
_mask=new byte[]{(byte)0xff,(byte)0xff,(byte)0xff,(byte)0xff};
}
public FixedMaskGen(byte[] mask)
{
_mask=mask;
}
public void genMask(byte[] mask)
{
mask[0]=_mask[0];
mask[1]=_mask[1];
mask[2]=_mask[2];
mask[3]=_mask[3];
}
}
public static class RandomMaskGen implements MaskGen
{
final Random _random;
public RandomMaskGen()
{
_random=new SecureRandom();
}
public RandomMaskGen(Random random)
{
_random=random;
}
public void genMask(byte[] mask)
{
_random.nextBytes(mask);
}
}
public WebSocketGeneratorD07(WebSocketBuffers buffers, EndPoint endp)
{
_buffers=buffers;
_endp=endp;
_maskGen=null;
}
public WebSocketGeneratorD07(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen)
{
_buffers=buffers;
_endp=endp;
_maskGen=maskGen;
}
public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length, int blockFor) throws IOException
{
// System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length);
boolean mask=_maskGen!=null;
if (_buffer==null)
_buffer=mask?_buffers.getBuffer():_buffers.getDirectBuffer();
boolean last=WebSocketConnectionD07.isLastFrame(flags);
opcode=(byte)(((0xf&flags)<<4)+0xf&opcode);
int space=mask?14:10;
do
{
opcode = _opsent?WebSocketConnectionD07.OP_CONTINUATION:opcode;
_opsent=true;
int payload=length;
if (payload+space>_buffer.capacity())
{
// We must fragement, so clear FIN bit
opcode&=(byte)0x7F; // Clear the FIN bit
payload=_buffer.capacity()-space;
}
else if (last)
opcode|=(byte)0x80; // Set the FIN bit
// ensure there is space for header
if (_buffer.space() <= space)
expelBuffer(blockFor);
// write the opcode and length
if (payload>0xffff)
{
_buffer.put(new byte[]{
opcode,
mask?(byte)0xff:(byte)0x7f,
(byte)((payload>>56)&0x7f),
(byte)((payload>>48)&0xff),
(byte)((payload>>40)&0xff),
(byte)((payload>>32)&0xff),
(byte)((payload>>24)&0xff),
(byte)((payload>>16)&0xff),
(byte)((payload>>8)&0xff),
(byte)(payload&0xff)});
}
else if (payload >=0x7e)
{
_buffer.put(new byte[]{
opcode,
mask?(byte)0xfe:(byte)0x7e,
(byte)(payload>>8),
(byte)(payload&0xff)});
}
else
{
_buffer.put(new byte[]{
opcode,
(byte)(mask?(0x80|payload):payload)});
}
// write mask
if (mask)
{
_maskGen.genMask(_mask);
_m=0;
_buffer.put(_mask);
}
// write payload
int remaining = payload;
while (remaining > 0)
{
_buffer.compact();
int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
if (mask)
{
for (int i=0;i<chunk;i++)
_buffer.put((byte)(content[offset+ (payload-remaining)+i]^_mask[+_m++%4]));
}
else
_buffer.put(content, offset + (payload - remaining), chunk);
remaining -= chunk;
if (_buffer.space() > 0)
{
// Gently flush the data, issuing a non-blocking write
flushBuffer();
}
else
{
// Forcibly flush the data, issuing a blocking write
expelBuffer(blockFor);
if (remaining == 0)
{
// Gently flush the data, issuing a non-blocking write
flushBuffer();
}
}
}
offset+=payload;
length-=payload;
}
while (length>0);
_opsent=!last;
}
public synchronized int flush(int blockFor) throws IOException
{
return expelBuffer(blockFor);
}
public synchronized int flush() throws IOException
{
int flushed = flushBuffer();
if (_buffer!=null && _buffer.length()==0)
{
_buffers.returnBuffer(_buffer);
_buffer=null;
}
return flushed;
}
private synchronized int flushBuffer() throws IOException
{
if (!_endp.isOpen())
throw new EofException();
if (_buffer!=null)
return _endp.flush(_buffer);
return 0;
}
private synchronized int expelBuffer(long blockFor) throws IOException
{
if (_buffer==null)
return 0;
int result = flushBuffer();
_buffer.compact();
if (!_endp.isBlocking())
{
while (_buffer.space()==0)
{
// TODO: in case the I/O system signals write ready, but when we attempt to write we cannot
// TODO: we should decrease the blockFor timeout instead of waiting again the whole timeout
boolean ready = _endp.blockWritable(blockFor);
if (!ready)
throw new IOException("Write timeout");
result += flushBuffer();
_buffer.compact();
}
}
return result;
}
public synchronized boolean isBufferEmpty()
{
return _buffer==null || _buffer.length()==0;
}
}

View File

@ -0,0 +1,315 @@
// ========================================================================
// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.websocket;
import java.io.IOException;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
/* ------------------------------------------------------------ */
/**
* Parser the WebSocket protocol.
*
*/
public class WebSocketParserD07 implements WebSocketParser
{
public enum State {
START(0), OPCODE(1), LENGTH_7(1), LENGTH_16(2), LENGTH_63(8), MASK(4), PAYLOAD(0), DATA(0), SKIP(1);
int _needs;
State(int needs)
{
_needs=needs;
}
int getNeeds()
{
return _needs;
}
};
private final WebSocketBuffers _buffers;
private final EndPoint _endp;
private final FrameHandler _handler;
private final boolean _shouldBeMasked;
private State _state;
private Buffer _buffer;
private byte _flags;
private byte _opcode;
private int _bytesNeeded;
private long _length;
private boolean _masked;
private final byte[] _mask = new byte[4];
private int _m;
private boolean _skip;
/* ------------------------------------------------------------ */
/**
* @param buffers The buffers to use for parsing. Only the {@link Buffers#getBuffer()} is used.
* This should be a direct buffer if binary data is mostly used or an indirect buffer if utf-8 data
* is mostly used.
* @param endp
* @param handler
*/
public WebSocketParserD07(WebSocketBuffers buffers, EndPoint endp, FrameHandler handler, boolean shouldBeMasked)
{
_buffers=buffers;
_endp=endp;
_handler=handler;
_shouldBeMasked=shouldBeMasked;
_state=State.START;
}
/* ------------------------------------------------------------ */
public boolean isBufferEmpty()
{
return _buffer==null || _buffer.length()==0;
}
/* ------------------------------------------------------------ */
public Buffer getBuffer()
{
return _buffer;
}
/* ------------------------------------------------------------ */
/** Parse to next event.
* Parse to the next {@link WebSocketParser.FrameHandler} event or until no more data is
* available. Fill data from the {@link EndPoint} only as necessary.
* @return An indication of progress or otherwise. -1 indicates EOF, 0 indicates
* that no bytes were read and no messages parsed. A positive number indicates either
* the bytes filled or the messages parsed.
*/
public int parseNext()
{
if (_buffer==null)
_buffer=_buffers.getBuffer();
int total_filled=0;
int events=0;
// Loop until a datagram call back or can't fill anymore
while(true)
{
int available=_buffer.length();
// Fill buffer if we need a byte or need length
while (available<(_state==State.SKIP?1:_bytesNeeded))
{
// compact to mark (set at start of data)
_buffer.compact();
// if no space, then the data is too big for buffer
if (_buffer.space() == 0)
throw new IllegalStateException("FULL: "+_state+" "+_bytesNeeded+">"+_buffer.capacity());
// catch IOExceptions (probably EOF) and try to parse what we have
try
{
int filled=_endp.isOpen()?_endp.fill(_buffer):-1;
if (filled<=0)
return (total_filled+events)>0?(total_filled+events):filled;
total_filled+=filled;
available=_buffer.length();
}
catch(IOException e)
{
Log.debug(e);
return (total_filled+events)>0?(total_filled+events):-1;
}
}
// if we are here, then we have sufficient bytes to process the current state.
// Parse the buffer byte by byte (unless it is STATE_DATA)
byte b;
while (_state!=State.DATA && available>=(_state==State.SKIP?1:_bytesNeeded))
{
switch (_state)
{
case START:
_skip=false;
_state=State.OPCODE;
_bytesNeeded=_state.getNeeds();
continue;
case OPCODE:
b=_buffer.get();
available--;
_opcode=(byte)(b&0xf);
_flags=(byte)(0xf&(b>>4));
if (WebSocketConnectionD07.isControlFrame(_opcode)&&!WebSocketConnectionD07.isLastFrame(_flags))
{
events++;
Log.warn("Fragmented Control from "+_endp);
_handler.close(WebSocketConnectionD07.CLOSE_PROTOCOL,"Fragmented control");
_skip=true;
}
_state=State.LENGTH_7;
_bytesNeeded=_state.getNeeds();
continue;
case LENGTH_7:
b=_buffer.get();
available--;
_masked=(b&0x80)!=0;
b=(byte)(0x7f&b);
switch(b)
{
case 127:
_length=0;
_state=State.LENGTH_63;
break;
case 126:
_length=0;
_state=State.LENGTH_16;
break;
default:
_length=(0x7f&b);
_state=_masked?State.MASK:State.PAYLOAD;
}
_bytesNeeded=_state.getNeeds();
continue;
case LENGTH_16:
b=_buffer.get();
available--;
_length = _length*0x100 + (0xff&b);
if (--_bytesNeeded==0)
{
if (_length>_buffer.capacity())
{
events++;
_handler.close(WebSocketConnectionD07.CLOSE_LARGE,"frame size "+_length+">"+_buffer.capacity());
_skip=true;
}
_state=_masked?State.MASK:State.PAYLOAD;
_bytesNeeded=_state.getNeeds();
}
continue;
case LENGTH_63:
b=_buffer.get();
available--;
_length = _length*0x100 + (0xff&b);
if (--_bytesNeeded==0)
{
_bytesNeeded=(int)_length;
if (_length>=_buffer.capacity())
{
events++;
_handler.close(WebSocketConnectionD07.CLOSE_LARGE,"frame size "+_length+">"+_buffer.capacity());
_skip=true;
}
_state=_masked?State.MASK:State.PAYLOAD;
_bytesNeeded=_state.getNeeds();
}
continue;
case MASK:
_buffer.get(_mask,0,4);
_m=0;
available-=4;
_state=State.PAYLOAD;
_bytesNeeded=_state.getNeeds();
break;
case PAYLOAD:
_bytesNeeded=(int)_length;
_state=_skip?State.SKIP:State.DATA;
break;
case DATA:
break;
case SKIP:
int skip=Math.min(available,_bytesNeeded);
_buffer.skip(skip);
available-=skip;
_bytesNeeded-=skip;
if (_bytesNeeded==0)
_state=State.START;
}
}
if (_state==State.DATA && available>=_bytesNeeded)
{
if ( _masked!=_shouldBeMasked)
{
_buffer.skip(_bytesNeeded);
_state=State.START;
events++;
_handler.close(WebSocketConnectionD07.CLOSE_PROTOCOL,"bad mask");
}
else
{
Buffer data =_buffer.get(_bytesNeeded);
if (_masked)
{
if (data.array()==null)
data=_buffer.asMutableBuffer();
byte[] array = data.array();
final int end=data.putIndex();
for (int i=data.getIndex();i<end;i++)
array[i]^=_mask[_m++%4];
}
// System.err.printf("%s %s %s >>\n",TypeUtil.toHexString(_flags),TypeUtil.toHexString(_opcode),data.length());
events++;
_handler.onFrame(_flags, _opcode, data);
_bytesNeeded=0;
_state=State.START;
}
if (_buffer.length()==0)
{
_buffers.returnBuffer(_buffer);
_buffer=null;
}
return total_filled+events;
}
}
}
/* ------------------------------------------------------------ */
public void fill(Buffer buffer)
{
if (buffer!=null && buffer.length()>0)
{
if (_buffer==null)
_buffer=_buffers.getBuffer();
_buffer.put(buffer);
buffer.clear();
}
}
}

View File

@ -0,0 +1,200 @@
package org.eclipse.jetty.websocket;
import static junit.framework.Assert.assertEquals;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.util.StringUtil;
import org.junit.Before;
import org.junit.Test;
/**
* @version $Revision$ $Date$
*/
public class WebSocketGeneratorD07Test
{
private ByteArrayBuffer _out;
private WebSocketGenerator _generator;
ByteArrayEndPoint _endPoint;
WebSocketBuffers _buffers;
byte[] _mask = new byte[4];
int _m;
public WebSocketGeneratorD07.MaskGen _maskGen = new WebSocketGeneratorD07.FixedMaskGen(
new byte[]{(byte)0x00,(byte)0x00,(byte)0x0f,(byte)0xff});
@Before
public void setUp() throws Exception
{
_endPoint = new ByteArrayEndPoint();
_out = new ByteArrayBuffer(2048);
_endPoint.setOut(_out);
_buffers = new WebSocketBuffers(1024);
_m=0;
}
byte getMasked()
{
return (byte)(_out.get()^_mask[_m++%4]);
}
@Test
public void testOneString() throws Exception
{
_generator = new WebSocketGeneratorD07(_buffers, _endPoint,null);
byte[] data = "Hell\uFF4F W\uFF4Frld".getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0x8,(byte)0x04,data,0,data.length,0);
_generator.flush();
assertEquals((byte)0x84,_out.get());
assertEquals(15,0xff&_out.get());
assertEquals('H',_out.get());
assertEquals('e',_out.get());
assertEquals('l',_out.get());
assertEquals('l',_out.get());
assertEquals(0xEF,0xff&_out.get());
assertEquals(0xBD,0xff&_out.get());
assertEquals(0x8F,0xff&_out.get());
assertEquals(' ',_out.get());
assertEquals('W',_out.get());
assertEquals(0xEF,0xff&_out.get());
assertEquals(0xBD,0xff&_out.get());
assertEquals(0x8F,0xff&_out.get());
assertEquals('r',_out.get());
assertEquals('l',_out.get());
assertEquals('d',_out.get());
}
@Test
public void testOneBuffer() throws Exception
{
_generator = new WebSocketGeneratorD07(_buffers, _endPoint,null);
String string = "Hell\uFF4F W\uFF4Frld";
byte[] bytes=string.getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0x8,(byte)0x04,bytes,0,bytes.length,0);
_generator.flush();
assertEquals((byte)0x84,_out.get());
assertEquals(15,0xff&_out.get());
assertEquals('H',_out.get());
assertEquals('e',_out.get());
assertEquals('l',_out.get());
assertEquals('l',_out.get());
assertEquals(0xEF,0xff&_out.get());
assertEquals(0xBD,0xff&_out.get());
assertEquals(0x8F,0xff&_out.get());
assertEquals(' ',_out.get());
assertEquals('W',_out.get());
assertEquals(0xEF,0xff&_out.get());
assertEquals(0xBD,0xff&_out.get());
assertEquals(0x8F,0xff&_out.get());
assertEquals('r',_out.get());
assertEquals('l',_out.get());
assertEquals('d',_out.get());
}
@Test
public void testOneLongBuffer() throws Exception
{
_generator = new WebSocketGeneratorD07(_buffers, _endPoint,null);
byte[] b=new byte[150];
for (int i=0;i<b.length;i++)
b[i]=(byte)('0'+(i%10));
_generator.addFrame((byte)0x8,(byte)0x4,b,0,b.length,0);
_generator.flush();
assertEquals((byte)0x84,_out.get());
assertEquals((byte)126,_out.get());
assertEquals((byte)0,_out.get());
assertEquals((byte)b.length,_out.get());
for (int i=0;i<b.length;i++)
assertEquals('0'+(i%10),0xff&_out.get());
}
@Test
public void testOneStringMasked() throws Exception
{
_generator = new WebSocketGeneratorD07(_buffers, _endPoint,_maskGen);
byte[] data = "Hell\uFF4F W\uFF4Frld".getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0x8,(byte)0x04,data,0,data.length,0);
_generator.flush();
assertEquals((byte)0x84,_out.get());
assertEquals(15,0x7f&_out.get());
_out.get(_mask,0,4);
assertEquals('H',getMasked());
assertEquals('e',getMasked());
assertEquals('l',getMasked());
assertEquals('l',getMasked());
assertEquals(0xEF,0xff&getMasked());
assertEquals(0xBD,0xff&getMasked());
assertEquals(0x8F,0xff&getMasked());
assertEquals(' ',getMasked());
assertEquals('W',getMasked());
assertEquals(0xEF,0xff&getMasked());
assertEquals(0xBD,0xff&getMasked());
assertEquals(0x8F,0xff&getMasked());
assertEquals('r',getMasked());
assertEquals('l',getMasked());
assertEquals('d',getMasked());
}
@Test
public void testOneBufferMasked() throws Exception
{
_generator = new WebSocketGeneratorD07(_buffers, _endPoint,_maskGen);
String string = "Hell\uFF4F W\uFF4Frld";
byte[] bytes=string.getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0x8,(byte)0x04,bytes,0,bytes.length,0);
_generator.flush();
assertEquals((byte)0x84,_out.get());
assertEquals(15,0x7f&_out.get());
_out.get(_mask,0,4);
assertEquals('H',getMasked());
assertEquals('e',getMasked());
assertEquals('l',getMasked());
assertEquals('l',getMasked());
assertEquals(0xEF,0xff&getMasked());
assertEquals(0xBD,0xff&getMasked());
assertEquals(0x8F,0xff&getMasked());
assertEquals(' ',getMasked());
assertEquals('W',getMasked());
assertEquals(0xEF,0xff&getMasked());
assertEquals(0xBD,0xff&getMasked());
assertEquals(0x8F,0xff&getMasked());
assertEquals('r',getMasked());
assertEquals('l',getMasked());
assertEquals('d',getMasked());
}
@Test
public void testOneLongBufferMasked() throws Exception
{
_generator = new WebSocketGeneratorD07(_buffers, _endPoint,_maskGen);
byte[] b=new byte[150];
for (int i=0;i<b.length;i++)
b[i]=(byte)('0'+(i%10));
_generator.addFrame((byte)0x8,(byte)0x04,b,0,b.length,0);
_generator.flush();
assertEquals((byte)0x84,_out.get());
assertEquals((byte)126,0x7f&_out.get());
assertEquals((byte)0,_out.get());
assertEquals((byte)b.length,_out.get());
_out.get(_mask,0,4);
for (int i=0;i<b.length;i++)
assertEquals('0'+(i%10),0xff&getMasked());
}
}

View File

@ -0,0 +1,849 @@
package org.eclipse.jetty.websocket;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import javax.servlet.http.HttpServletRequest;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* @version $Revision$ $Date$
*/
public class WebSocketMessageD07Test
{
private static Server _server;
private static Connector _connector;
private static TestWebSocket _serverWebSocket;
@BeforeClass
public static void startServer() throws Exception
{
_server = new Server();
_connector = new SelectChannelConnector();
_server.addConnector(_connector);
WebSocketHandler wsHandler = new WebSocketHandler()
{
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
_serverWebSocket = new TestWebSocket();
_serverWebSocket._onConnect=("onConnect".equals(protocol));
_serverWebSocket._echo=("echo".equals(protocol));
_serverWebSocket._aggregate=("aggregate".equals(protocol));
return _serverWebSocket;
}
};
wsHandler.setBufferSize(8192);
wsHandler.setMaxIdleTime(1000);
wsHandler.setHandler(new DefaultHandler());
_server.setHandler(wsHandler);
_server.start();
}
@AfterClass
public static void stopServer() throws Exception
{
_server.stop();
_server.join();
}
@Test
public void testHash()
{
assertEquals("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",WebSocketConnectionD07.hashKey("dGhlIHNhbXBsZSBub25jZQ=="));
}
@Test
public void testServerSendBigStringMessage() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: chat, superchat\r\n"+
"Sec-WebSocket-Version: 7\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
// Make sure the read times out if there are problems with the implementation
socket.setSoTimeout(1000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection);
// Server sends a big message
StringBuilder message = new StringBuilder();
String text = "0123456789ABCDEF";
for (int i = 0; i < (0x2000) / text.length(); i++)
message.append(text);
String data=message.toString();
_serverWebSocket.connection.sendMessage(data);
assertEquals(WebSocketConnectionD07.OP_TEXT,input.read());
assertEquals(0x7e,input.read());
assertEquals(0x1f,input.read());
assertEquals(0xf6,input.read());
lookFor(data.substring(0,0x1ff6),input);
assertEquals(0x80,input.read());
assertEquals(0x0A,input.read());
lookFor(data.substring(0x1ff6),input);
}
@Test
public void testServerSendOnConnect() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: onConnect\r\n" +
"Sec-WebSocket-Version: 7\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
// Make sure the read times out if there are problems with the implementation
socket.setSoTimeout(1000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection);
assertEquals(0x81,input.read());
assertEquals(0x0f,input.read());
lookFor("sent on connect",input);
}
@Test
public void testServerEcho() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: echo\r\n" +
"Sec-WebSocket-Version: 7\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
output.write(0x84);
output.write(0x8f);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
byte[] bytes="this is an echo".getBytes(StringUtil.__ISO_8859_1);
for (int i=0;i<bytes.length;i++)
output.write(bytes[i]^0xff);
output.flush();
// Make sure the read times out if there are problems with the implementation
socket.setSoTimeout(1000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection);
assertEquals(0x84,input.read());
assertEquals(0x0f,input.read());
lookFor("this is an echo",input);
}
@Test
public void testServerPingPong() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
// Make sure the read times out if there are problems with the implementation
OutputStream output = socket.getOutputStream();
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: echo\r\n" +
"Sec-WebSocket-Version: 7\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
output.write(0x89);
output.write(0x80);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.flush();
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection);
socket.setSoTimeout(1000);
assertEquals(0x8A,input.read());
assertEquals(0x00,input.read());
}
@Test
public void testMaxTextSize() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: other\r\n" +
"Sec-WebSocket-Version: 7\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
socket.setSoTimeout(1000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection);
_serverWebSocket.getConnection().setMaxTextMessageSize(15);
output.write(0x01);
output.write(0x8a);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
byte[] bytes="0123456789".getBytes(StringUtil.__ISO_8859_1);
for (int i=0;i<bytes.length;i++)
output.write(bytes[i]^0xff);
output.flush();
output.write(0x80);
output.write(0x8a);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
for (int i=0;i<bytes.length;i++)
output.write(bytes[i]^0xff);
output.flush();
assertEquals(0x80|WebSocketConnectionD07.OP_CLOSE,input.read());
assertEquals(30,input.read());
int code=(0xff&input.read())*0x100+(0xff&input.read());
assertEquals(1004,code);
lookFor("Text message size > 15 chars",input);
}
@Test
public void testMaxTextSize2() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: other\r\n" +
"Sec-WebSocket-Version: 7\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
socket.setSoTimeout(100000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection);
_serverWebSocket.getConnection().setMaxTextMessageSize(15);
output.write(0x01);
output.write(0x94);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
byte[] bytes="01234567890123456789".getBytes(StringUtil.__ISO_8859_1);
for (int i=0;i<bytes.length;i++)
output.write(bytes[i]^0xff);
output.flush();
assertEquals(0x80|WebSocketConnectionD07.OP_CLOSE,input.read());
assertEquals(30,input.read());
int code=(0xff&input.read())*0x100+(0xff&input.read());
assertEquals(1004,code);
lookFor("Text message size > 15 chars",input);
}
@Test
public void testBinaryAggregate() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: aggregate\r\n" +
"Sec-WebSocket-Version: 7\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
socket.setSoTimeout(1000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection);
_serverWebSocket.getConnection().setMaxBinaryMessageSize(1024);
output.write(WebSocketConnectionD07.OP_BINARY);
output.write(0x8a);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
byte[] bytes="0123456789".getBytes(StringUtil.__ISO_8859_1);
for (int i=0;i<bytes.length;i++)
output.write(bytes[i]^0xff);
output.flush();
output.write(0x80);
output.write(0x8a);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
for (int i=0;i<bytes.length;i++)
output.write(bytes[i]^0xff);
output.flush();
assertEquals(0x80+WebSocketConnectionD07.OP_BINARY,input.read());
assertEquals(20,input.read());
lookFor("01234567890123456789",input);
}
@Test
public void testMaxBinarySize() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: other\r\n" +
"Sec-WebSocket-Version: 7\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
socket.setSoTimeout(100000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection);
_serverWebSocket.getConnection().setMaxBinaryMessageSize(15);
output.write(0x02);
output.write(0x8a);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
byte[] bytes="0123456789".getBytes(StringUtil.__ISO_8859_1);
for (int i=0;i<bytes.length;i++)
output.write(bytes[i]^0xff);
output.flush();
output.write(0x80);
output.write(0x8a);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
for (int i=0;i<bytes.length;i++)
output.write(bytes[i]^0xff);
output.flush();
assertEquals(0x80|WebSocketConnectionD07.OP_CLOSE,input.read());
assertEquals(19,input.read());
int code=(0xff&input.read())*0x100+(0xff&input.read());
assertEquals(1004,code);
lookFor("Message size > 15",input);
}
@Test
public void testMaxBinarySize2() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: other\r\n" +
"Sec-WebSocket-Version: 7\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
socket.setSoTimeout(100000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection);
_serverWebSocket.getConnection().setMaxBinaryMessageSize(15);
output.write(0x02);
output.write(0x94);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
byte[] bytes="01234567890123456789".getBytes(StringUtil.__ISO_8859_1);
for (int i=0;i<bytes.length;i++)
output.write(bytes[i]^0xff);
output.flush();
assertEquals(0x80|WebSocketConnectionD07.OP_CLOSE,input.read());
assertEquals(19,input.read());
int code=(0xff&input.read())*0x100+(0xff&input.read());
assertEquals(1004,code);
lookFor("Message size > 15",input);
}
@Test
public void testIdle() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: onConnect\r\n" +
"Sec-WebSocket-Version: 7\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
// Make sure the read times out if there are problems with the implementation
socket.setSoTimeout(10000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection);
assertEquals(0x81,input.read());
assertEquals(0x0f,input.read());
lookFor("sent on connect",input);
assertEquals((byte)0x88,(byte)input.read());
assertEquals(0x06,input.read());
assertEquals(1000/0x100,input.read());
assertEquals(1000%0x100,input.read());
lookFor("Idle",input);
// respond to close
output.write(0x88^0xff);
output.write(0x80^0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.flush();
assertTrue(_serverWebSocket.awaitDisconnected(5000));
try
{
_serverWebSocket.connection.sendMessage("Don't send");
assertTrue(false);
}
catch(IOException e)
{
assertTrue(true);
}
}
@Test
public void testClose() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: onConnect\r\n" +
"Sec-WebSocket-Version: 7\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
// Make sure the read times out if there are problems with the implementation
socket.setSoTimeout(1000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection);
assertEquals(0x81,input.read());
assertEquals(0x0f,input.read());
lookFor("sent on connect",input);
socket.close();
assertTrue(_serverWebSocket.awaitDisconnected(500));
try
{
_serverWebSocket.connection.sendMessage("Don't send");
assertTrue(false);
}
catch(IOException e)
{
assertTrue(true);
}
}
@Test
public void testParserAndGenerator() throws Exception
{
String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";
final AtomicReference<String> received = new AtomicReference<String>();
ByteArrayEndPoint endp = new ByteArrayEndPoint(new byte[0],4096);
WebSocketGeneratorD07 gen = new WebSocketGeneratorD07(new WebSocketBuffers(8096),endp,null);
byte[] data = message.getBytes(StringUtil.__UTF8);
gen.addFrame((byte)0x8,(byte)0x4,data,0,data.length,1000);
endp = new ByteArrayEndPoint(endp.getOut().asArray(),4096);
WebSocketParserD07 parser = new WebSocketParserD07(new WebSocketBuffers(8096),endp,new WebSocketParser.FrameHandler()
{
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
received.set(buffer.toString());
}
public void close(int code,String message)
{
}
},false);
parser.parseNext();
assertEquals(message,received.get());
}
@Test
public void testParserAndGeneratorMasked() throws Exception
{
String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";
final AtomicReference<String> received = new AtomicReference<String>();
ByteArrayEndPoint endp = new ByteArrayEndPoint(new byte[0],4096);
WebSocketGeneratorD07.MaskGen maskGen = new WebSocketGeneratorD07.RandomMaskGen();
WebSocketGeneratorD07 gen = new WebSocketGeneratorD07(new WebSocketBuffers(8096),endp,maskGen);
byte[] data = message.getBytes(StringUtil.__UTF8);
gen.addFrame((byte)0x8,(byte)0x1,data,0,data.length,1000);
endp = new ByteArrayEndPoint(endp.getOut().asArray(),4096);
WebSocketParserD07 parser = new WebSocketParserD07(new WebSocketBuffers(8096),endp,new WebSocketParser.FrameHandler()
{
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
received.set(buffer.toString());
}
public void close(int code,String message)
{
}
},true);
parser.parseNext();
assertEquals(message,received.get());
}
private void lookFor(String string,InputStream in)
throws IOException
{
String orig=string;
Utf8StringBuilder scanned=new Utf8StringBuilder();
try
{
while(true)
{
int b = in.read();
if (b<0)
throw new EOFException();
scanned.append((byte)b);
assertEquals("looking for\""+orig+"\" in '"+scanned+"'",(int)string.charAt(0),b);
if (string.length()==1)
break;
string=string.substring(1);
}
}
catch(IOException e)
{
System.err.println("IOE while looking for \""+orig+"\" in '"+scanned+"'");
throw e;
}
}
private void skipTo(String string,InputStream in)
throws IOException
{
int state=0;
while(true)
{
int b = in.read();
if (b<0)
throw new EOFException();
if (b==string.charAt(state))
{
state++;
if (state==string.length())
break;
}
else
state=0;
}
}
private static class TestWebSocket implements WebSocket.OnFrame, WebSocket.OnBinaryMessage, WebSocket.OnTextMessage
{
boolean _onConnect=false;
boolean _echo=true;
boolean _aggregate=false;
private final CountDownLatch connected = new CountDownLatch(1);
private final CountDownLatch disconnected = new CountDownLatch(1);
private volatile FrameConnection connection;
public Connection getConnection()
{
return connection;
}
public void onHandshake(FrameConnection connection)
{
this.connection = connection;
}
public void onOpen(Connection connection)
{
if (_onConnect)
{
try
{
connection.sendMessage("sent on connect");
}
catch(IOException e)
{
e.printStackTrace();
}
}
connected.countDown();
}
private boolean awaitConnected(long time) throws InterruptedException
{
return connected.await(time, TimeUnit.MILLISECONDS);
}
private boolean awaitDisconnected(long time) throws InterruptedException
{
return disconnected.await(time, TimeUnit.MILLISECONDS);
}
public void onClose(int code,String message)
{
disconnected.countDown();
}
public boolean onFrame(byte flags, byte opcode, byte[] data, int offset, int length)
{
if (_echo)
{
switch(opcode)
{
case WebSocketConnectionD07.OP_CLOSE:
case WebSocketConnectionD07.OP_PING:
case WebSocketConnectionD07.OP_PONG:
break;
default:
try
{
connection.sendFrame(flags,opcode,data,offset,length);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
return false;
}
public void onMessage(byte[] data, int offset, int length)
{
if (_aggregate)
{
try
{
connection.sendMessage(data,offset,length);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
public void onMessage(String data)
{
if (_aggregate)
{
try
{
connection.sendMessage(data);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
}
}

View File

@ -0,0 +1,321 @@
package org.eclipse.jetty.websocket;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.http.HttpHeaderValues;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.BufferCache.CachedBuffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.junit.Before;
import org.junit.Test;
/**
* @version $Revision$ $Date$
*/
public class WebSocketParserD07Test
{
private MaskedByteArrayBuffer _in;
private Handler _handler;
private WebSocketParser _parser;
private byte[] _mask = new byte[] {(byte)0x00,(byte)0xF0,(byte)0x0F,(byte)0xFF};
private int _m;
class MaskedByteArrayBuffer extends ByteArrayBuffer
{
MaskedByteArrayBuffer()
{
super(4096);
}
public void sendMask()
{
super.poke(putIndex(),_mask,0,4);
super.setPutIndex(putIndex()+4);
_m=0;
}
@Override
public int put(Buffer src)
{
return put(src.asArray(),0,src.length());
}
public void putUnmasked(byte b)
{
super.put(b);
}
@Override
public void put(byte b)
{
super.put((byte)(b^_mask[_m++%4]));
}
@Override
public int put(byte[] b, int offset, int length)
{
byte[] mb = new byte[b.length];
final int end=offset+length;
for (int i=offset;i<end;i++)
{
mb[i]=(byte)(b[i]^_mask[_m++%4]);
}
return super.put(mb,offset,length);
}
@Override
public int put(byte[] b)
{
return put(b,0,b.length);
}
};
@Before
public void setUp() throws Exception
{
WebSocketBuffers buffers = new WebSocketBuffers(1024);
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
endPoint.setNonBlocking(true);
_handler = new Handler();
_parser=new WebSocketParserD07(buffers, endPoint,_handler,true);
_in = new MaskedByteArrayBuffer();
endPoint.setIn(_in);
}
@Test
public void testCache() throws Exception
{
assertEquals(HttpHeaderValues.UPGRADE_ORDINAL ,((CachedBuffer)HttpHeaderValues.CACHE.lookup("Upgrade")).getOrdinal());
}
@Test
public void testFlagsOppcode() throws Exception
{
_in.putUnmasked((byte)0xff);
_in.putUnmasked((byte)0x80);
_in.sendMask();
int progress =_parser.parseNext();
assertTrue(progress>0);
assertEquals(0xf,_handler._flags);
assertEquals(0xf,_handler._opcode);
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
@Test
public void testShortText() throws Exception
{
_in.putUnmasked((byte)0x81);
_in.putUnmasked((byte)(0x80|11));
_in.sendMask();
_in.put("Hello World".getBytes(StringUtil.__UTF8));
// System.err.println("tosend="+TypeUtil.toHexString(_in.asArray()));
int progress =_parser.parseNext();
assertEquals(18,progress);
assertEquals("Hello World",_handler._data.get(0));
assertEquals(0x8,_handler._flags);
assertEquals(0x1,_handler._opcode);
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
@Test
public void testShortUtf8() throws Exception
{
String string = "Hell\uFF4f W\uFF4Frld";
byte[] bytes = string.getBytes("UTF-8");
_in.putUnmasked((byte)0x81);
_in.putUnmasked((byte)(0x80|bytes.length));
_in.sendMask();
_in.put(bytes);
int progress =_parser.parseNext();
assertEquals(bytes.length+7,progress);
assertEquals(string,_handler._data.get(0));
assertEquals(0x8,_handler._flags);
assertEquals(0x1,_handler._opcode);
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
@Test
public void testMediumText() throws Exception
{
String string = "Hell\uFF4f Medium W\uFF4Frld ";
for (int i=0;i<4;i++)
string = string+string;
string += ". The end.";
byte[] bytes = string.getBytes(StringUtil.__UTF8);
_in.putUnmasked((byte)0x81);
_in.putUnmasked((byte)(0x80|0x7E));
_in.putUnmasked((byte)(bytes.length>>8));
_in.putUnmasked((byte)(bytes.length&0xff));
_in.sendMask();
_in.put(bytes);
int progress =_parser.parseNext();
assertEquals(bytes.length+9,progress);
assertEquals(string,_handler._data.get(0));
assertEquals(0x8,_handler._flags);
assertEquals(0x1,_handler._opcode);
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
@Test
public void testLongText() throws Exception
{
WebSocketBuffers buffers = new WebSocketBuffers(0x20000);
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
WebSocketParser parser=new WebSocketParserD07(buffers, endPoint,_handler,false);
ByteArrayBuffer in = new ByteArrayBuffer(0x20000);
endPoint.setIn(in);
String string = "Hell\uFF4f Big W\uFF4Frld ";
for (int i=0;i<12;i++)
string = string+string;
string += ". The end.";
byte[] bytes = string.getBytes("UTF-8");
_in.sendMask();
in.put((byte)0x84);
in.put((byte)0x7F);
in.put((byte)0x00);
in.put((byte)0x00);
in.put((byte)0x00);
in.put((byte)0x00);
in.put((byte)0x00);
in.put((byte)(bytes.length>>16));
in.put((byte)((bytes.length>>8)&0xff));
in.put((byte)(bytes.length&0xff));
in.put(bytes);
int progress =parser.parseNext();
assertEquals(bytes.length+11,progress);
assertEquals(string,_handler._data.get(0));
assertTrue(parser.isBufferEmpty());
assertTrue(parser.getBuffer()==null);
}
@Test
public void testShortFragmentTest() throws Exception
{
_in.putUnmasked((byte)0x01);
_in.putUnmasked((byte)0x86);
_in.sendMask();
_in.put("Hello ".getBytes(StringUtil.__UTF8));
_in.putUnmasked((byte)0x80);
_in.putUnmasked((byte)0x85);
_in.sendMask();
_in.put("World".getBytes(StringUtil.__UTF8));
int progress =_parser.parseNext();
assertEquals(24,progress);
assertEquals(0,_handler._data.size());
assertFalse(_parser.isBufferEmpty());
assertFalse(_parser.getBuffer()==null);
progress =_parser.parseNext();
assertEquals(1,progress);
assertEquals("Hello World",_handler._data.get(0));
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
@Test
public void testFrameTooLarge() throws Exception
{
// Buffers are only 1024, so this frame is too large
_in.putUnmasked((byte)0x81);
_in.putUnmasked((byte)(0x80|0x7E));
_in.putUnmasked((byte)(2048>>8));
_in.putUnmasked((byte)(2048&0xff));
_in.sendMask();
int progress =_parser.parseNext();
assertTrue(progress>0);
assertEquals(WebSocketConnectionD07.CLOSE_LARGE,_handler._code);
for (int i=0;i<2048;i++)
_in.put((byte)'a');
progress =_parser.parseNext();
assertEquals(2048,progress);
assertEquals(0,_handler._data.size());
assertEquals(0,_handler._utf8.length());
_handler._code=0;
_handler._message=null;
_in.putUnmasked((byte)0x81);
_in.putUnmasked((byte)0xFE);
_in.putUnmasked((byte)(1024>>8));
_in.putUnmasked((byte)(1024&0xff));
_in.sendMask();
for (int i=0;i<1024;i++)
_in.put((byte)'a');
progress =_parser.parseNext();
assertTrue(progress>0);
assertEquals(1,_handler._data.size());
assertEquals(1024,_handler._data.get(0).length());
}
private class Handler implements WebSocketParser.FrameHandler
{
Utf8StringBuilder _utf8 = new Utf8StringBuilder();
public List<String> _data = new ArrayList<String>();
private byte _flags;
private byte _opcode;
int _code;
String _message;
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
_flags=flags;
_opcode=opcode;
if ((flags&0x8)==0)
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
else if (_utf8.length()==0)
_data.add(buffer.toString("utf-8"));
else
{
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
_data.add(_utf8.toString());
_utf8.reset();
}
}
public void close(int code,String message)
{
_code=code;
_message=message;
}
}
}