337685 Update websocket API in preparation for draft -07

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@2877 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Greg Wilkins 2011-03-10 13:15:27 +00:00
parent a1c3006519
commit e20fa91252
37 changed files with 1749 additions and 889 deletions

View File

@ -3,6 +3,9 @@ jetty-7.3.2-SNAPSHOT
+ Ensure generated fragment names are unique
+ 339187 In the OSGi manifest of the jetty-all-server aggregate, mark javax.annotation as optional
jetty-7.3.2-SNAPSHOT
+ 337685 Update websocket API in preparation for draft -07
jetty-7.3.1.v20110307 7 March 2011
+ 316382 Support a more strict SSL option with certificates
+ 333481 Handle UCS-4 codepoints in decode and encode

View File

@ -70,34 +70,26 @@ public class WebSocketUpgradeTest extends TestCase
public void testGetWithContentExchange() throws Exception
{
final WebSocket clientWS = new WebSocket()
final WebSocket clientWS = new WebSocket.OnTextMessage()
{
Outbound _outbound;
public void onConnect(Outbound outbound)
Connection _connection;
public void onDisconnect(int closeCode, String message)
{
_outbound=outbound;
}
public void onConnect(Connection connection)
{
_connection=connection;
_results.add("clientWS.onConnect");
_results.add(_outbound);
_results.add(_connection);
}
public void onDisconnect()
{
}
public void onMessage(byte frame, String data)
public void onMessage(String data)
{
_results.add("clientWS.onMessage");
_results.add(data);
}
public void onMessage(byte frame, byte[] data, int offset, int length)
{
}
public void onFragment(boolean more, byte opcode, byte[] data, int offset, int length)
{
}
};
@ -217,29 +209,25 @@ public class WebSocketUpgradeTest extends TestCase
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class TestWebSocket implements WebSocket
class TestWebSocket implements WebSocket.OnTextMessage
{
Outbound _outbound;
Connection _connection;
public void onConnect(Outbound outbound)
public void onConnect(Connection connection)
{
_outbound=outbound;
_connection=connection;
_webSockets.add(this);
_results.add("serverWS.onConnect");
_results.add(this);
}
public void onMessage(byte frame, byte[] data,int offset, int length)
{
}
public void onMessage(final byte frame, final String data)
public void onMessage(final String data)
{
_results.add("serverWS.onMessage");
_results.add(data);
}
public void onDisconnect()
public void onDisconnect(int code, String message)
{
_results.add("onDisconnect");
_webSockets.remove(this);
@ -247,11 +235,7 @@ public class WebSocketUpgradeTest extends TestCase
public void sendMessage(String msg) throws IOException
{
_outbound.sendMessage(msg);
}
public void onFragment(boolean more, byte opcode, byte[] data, int offset, int length)
{
_connection.sendMessage(msg);
}
}
}

View File

@ -15,6 +15,7 @@ package org.eclipse.jetty.io.nio;
import java.io.IOException;
import java.nio.channels.CancelledKeyException;
import java.nio.channels.ClosedSelectorException;
import java.nio.channels.SelectableChannel;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
@ -606,6 +607,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
});
}
}
catch (ClosedSelectorException e)
{
if (isRunning())
Log.warn(e);
else
Log.ignore(e);
}
catch (CancelledKeyException e)
{
Log.ignore(e);

View File

@ -383,6 +383,12 @@ public class TypeUtil
}
}
/* ------------------------------------------------------------ */
public static String toHexString(byte b)
{
return toHexString(new byte[]{b}, 0, 1);
}
/* ------------------------------------------------------------ */
public static String toHexString(byte[] b)
{

View File

@ -48,6 +48,25 @@ public class Utf8StringBuilder
for (int i=offset; i<end;i++)
append(b[i]);
}
/**
* @param b
* @param offset
* @param length
* @param maxChars The maximum characters allowed in the builder
* @return true if all bytes appended
*/
public boolean append(byte[] b,int offset, int length, int maxChars)
{
int end=offset+length;
for (int i=offset; i<end;i++)
{
if (length()>maxChars)
return false;
append(b[i]);
}
return true;
}
public void append(byte b)
{

View File

@ -1,55 +0,0 @@
// ========================================================================
// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.websocket;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
final class FrameHandlerD0 implements WebSocketParser.FrameHandler
{
final WebSocket _websocket;
final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
FrameHandlerD0(WebSocket websocket)
{
_websocket=websocket;
}
public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer)
{
assert more==false;
try
{
byte[] array=buffer.array();
if (opcode==0)
{
_websocket.onMessage(opcode,buffer.toString("utf-8"));
}
else
{
_websocket.onMessage(opcode,array,buffer.getIndex(),buffer.length());
}
}
catch(ThreadDeath th)
{
throw th;
}
catch(Throwable th)
{
Log.warn(th);
}
}
}

View File

@ -1,94 +0,0 @@
// ========================================================================
// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.websocket;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
final class FrameHandlerD1 implements WebSocketParser.FrameHandler
{
public final static byte PING=1;
public final static byte PONG=1;
final WebSocketConnectionD00 _connection;
final WebSocket _websocket;
final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
boolean _fragmented=false;
FrameHandlerD1(WebSocketConnectionD00 connection, WebSocket websocket)
{
_connection=connection;
_websocket=websocket;
}
public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer)
{
try
{
byte[] array=buffer.array();
if (opcode==0)
{
if (more)
{
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
_fragmented=true;
}
else if (_fragmented)
{
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
_websocket.onMessage(opcode,_utf8.toString());
_utf8.reset();
_fragmented=false;
}
else
{
_websocket.onMessage(opcode,buffer.toString("utf-8"));
}
}
else if (opcode==PING)
{
_connection.sendMessage(PONG,buffer.array(),buffer.getIndex(),buffer.length());
}
else if (opcode==PONG)
{
}
else
{
if (more)
{
_websocket.onFragment(true,opcode,array,buffer.getIndex(),buffer.length());
}
else if (_fragmented)
{
_websocket.onFragment(false,opcode,array,buffer.getIndex(),buffer.length());
}
else
{
_websocket.onMessage(opcode,array,buffer.getIndex(),buffer.length());
}
}
}
catch(ThreadDeath th)
{
throw th;
}
catch(Throwable th)
{
Log.warn(th);
}
}
}

View File

@ -0,0 +1,302 @@
package org.eclipse.jetty.websocket;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.security.SecureRandom;
import java.util.Arrays;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.bio.SocketEndPoint;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
/**
* @version $Revision$ $Date$
*/
public class TestClient
{
private final static Random __random = new SecureRandom();
private final String _host;
private final int _port;
private final String _protocol;
private int _size=64;
private final Socket _socket;
private final BufferedWriter _output;
private final BufferedReader _input;
private final SocketEndPoint _endp;
private final WebSocketGeneratorD06 _generator;
private final WebSocketParserD06 _parser;
private int _sent;
private int _received;
private long _totalTime;
private long _minDuration=Long.MAX_VALUE;
private long _maxDuration=Long.MIN_VALUE;
private long _start;
private BlockingQueue<Long> _starts = new LinkedBlockingQueue<Long>();
private BlockingQueue<String> _pending = new LinkedBlockingQueue<String>();
private final WebSocketParser.FrameHandler _handler = new WebSocketParser.FrameHandler()
{
public synchronized void onFrame(byte flags, byte opcode, Buffer buffer)
{
try
{
if (opcode == WebSocketConnectionD06.OP_CLOSE)
{
byte[] data=buffer.asArray();
System.err.println("CLOSED: "+((0xff&data[0])*0x100+(0xff&data[1]))+" "+new String(data,2,data.length-2,StringUtil.__UTF8));
_generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_CLOSE,data,0,data.length,_socket.getSoTimeout());
_generator.flush(_socket.getSoTimeout());
_socket.shutdownOutput();
_socket.close();
return;
}
Long start=_starts.take();
String data=_pending.take();
while (!data.equals(TypeUtil.toHexString(buffer.asArray())) && !_starts.isEmpty() && !_pending.isEmpty())
{
// Missed response
start=_starts.take();
data=_pending.take();
}
_received++;
long duration = System.nanoTime()-start.longValue();
if (duration>_maxDuration)
_maxDuration=duration;
if (duration<_minDuration)
_minDuration=duration;
_totalTime+=duration;
System.out.printf("%d bytes from %s: req=%d time=%.1fms opcode=0x%s\n",buffer.length(),_host,_received,((double)duration/1000000.0),TypeUtil.toHexString(opcode));
}
catch(Exception e)
{
e.printStackTrace();
}
}
public void close(int code,String message)
{
}
};
public TestClient(String host, int port,String protocol, int timeoutMS) throws IOException
{
_host=host;
_port=port;
_protocol=protocol;
_socket = new Socket(host, port);
_socket.setSoTimeout(timeoutMS);
_output = new BufferedWriter(new OutputStreamWriter(_socket.getOutputStream(), "ISO-8859-1"));
_input = new BufferedReader(new InputStreamReader(_socket.getInputStream(), "ISO-8859-1"));
_endp=new SocketEndPoint(_socket);
_generator = new WebSocketGeneratorD06(new WebSocketBuffers(32*1024),_endp,new WebSocketGeneratorD06.FixedMaskGen());
_parser = new WebSocketParserD06(new WebSocketBuffers(32*1024),_endp,_handler,false);
}
public int getSize()
{
return _size;
}
public void setSize(int size)
{
_size = size;
}
private void open() throws IOException
{
System.out.println("Jetty WebSocket PING "+_host+":"+_port+
" ("+_socket.getRemoteSocketAddress()+") " +_size+" bytes of data.");
byte[] key = new byte[16];
__random.nextBytes(key);
_output.write("GET /chat HTTP/1.1\r\n"+
"Host: "+_host+":"+_port+"\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: "+new String(B64Code.encode(key))+"\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: "+_protocol+"\r\n" +
"Sec-WebSocket-Version: 6\r\n"+
"\r\n");
_output.flush();
String responseLine = _input.readLine();
if(!responseLine.startsWith("HTTP/1.1 101 Switching Protocols"))
throw new IOException(responseLine);
// Read until we find Response key
String line;
boolean accepted=false;
String protocol="";
while ((line = _input.readLine()) != null)
{
if (line.length() == 0)
break;
if (line.startsWith("Sec-WebSocket-Accept:"))
{
String accept=line.substring(21).trim();
accepted=accept.equals(WebSocketConnectionD06.hashKey(new String(B64Code.encode(key))));
}
else if (line.startsWith("Sec-WebSocket-Protocol:"))
{
protocol=line.substring(24).trim();
}
}
if (!accepted)
throw new IOException("Bad Sec-WebSocket-Accept");
System.out.println("handshake OK for protocol '"+protocol+"'");
new Thread()
{
public void run()
{
while (_endp.isOpen())
_parser.parseNext();
}
}.start();
}
public void ping(int count,byte opcode,int fragment)
{
_start=System.currentTimeMillis();
for (int i=0;i<count && !_socket.isClosed();i++)
{
if (_socket.isClosed())
break;
try
{
byte data[] = new byte[_size];
__random.nextBytes(data);
_starts.add(System.nanoTime());
_pending.add(TypeUtil.toHexString(data));
int off=0;
int len=data.length;
if (fragment>0&& len>fragment)
len=fragment;
while(off<data.length)
{
_generator.addFrame((byte)(off==0?0x8:0),(byte)(off==0?opcode:WebSocketConnectionD06.OP_CONTINUATION),data,off,len,_socket.getSoTimeout());
_sent++;
off+=len;
if(data.length-off>len)
len=data.length-off;
}
_generator.flush(_socket.getSoTimeout());
Thread.sleep(1000);
}
catch (Exception x)
{
throw new RuntimeException(x);
}
}
}
public void dump() throws IOException
{
_socket.close();
long duration=System.currentTimeMillis()-_start;
System.out.println("--- "+_host+" websocket ping statistics using 1 connection ---");
System.out.println(_sent+" packets transmitted, "+_received+" received, "+
(_sent>0?String.format("%d",100*(_sent-_received)/_sent)+"% loss, ":"")+
"time "+duration+"ms");
System.out.printf("rtt min/ave/max = %.3f/%.3f/%.3f ms\n",_minDuration/1000000.0,_received==0?0.0:(_totalTime/_received/1000000.0),_maxDuration/1000000.0);
}
private static void usage(String[] args)
{
System.err.println("ERROR: "+Arrays.asList(args));
System.err.println("USAGE: java -cp CLASSPATH "+TestClient.class+" [ OPTIONS ]");
System.err.println(" -h|--host HOST (default localhost)");
System.err.println(" -p|--port PORT (default 8080)");
System.err.println(" -v|--verbose");
System.err.println(" -c|--count n (default 10)");
System.err.println(" -s|--size n (default 64)");
System.err.println(" -f|--fragment n (default 4000) ");
System.err.println(" -P|--protocol echo|echo-assemble|echo-fragment");
System.exit(1);
}
public static void main(String[] args)
{
try
{
String host="localhost";
int port=8080;
boolean verbose=false;
String protocol=null;
int count=10;
int size=64;
int fragment=4000;
for (int i=0;i<args.length;i++)
{
String a=args[i];
if ("-p".equals(a)||"--port".equals(a))
port=Integer.parseInt(args[++i]);
else if ("-h".equals(a)||"--host".equals(a))
port=Integer.parseInt(args[++i]);
else if ("-c".equals(a)||"--count".equals(a))
count=Integer.parseInt(args[++i]);
else if ("-s".equals(a)||"--size".equals(a))
size=Integer.parseInt(args[++i]);
else if ("-f".equals(a)||"--fragment".equals(a))
fragment=Integer.parseInt(args[++i]);
else if ("-P".equals(a)||"--protocol".equals(a))
protocol=args[++i];
else if ("-v".equals(a)||"--verbose".equals(a))
verbose=true;
else if (a.startsWith("-"))
usage(args);
}
TestClient client = new TestClient(host,port,protocol==null?null:("org.ietf.websocket.test-"+protocol),10000);
client.setSize(size);
try
{
client.open();
if ("echo".equals(protocol))
client.ping(count,WebSocketConnectionD06.OP_BINARY,fragment);
else
client.ping(count,WebSocketConnectionD06.OP_PING,-1);
}
finally
{
client.dump();
}
}
catch (Exception e)
{
Log.warn(e);
}
}
}

View File

@ -0,0 +1,287 @@
package org.eclipse.jetty.websocket;
import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.http.HttpServletRequest;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.websocket.WebSocket.Connection;
public class TestServer extends Server
{
boolean _verbose;
WebSocket _websocket;
SelectChannelConnector _connector;
WebSocketHandler _handler;
ConcurrentLinkedQueue<TestWebSocket> _webSockets = new ConcurrentLinkedQueue<TestWebSocket>();
public TestServer(int port)
{
_connector = new SelectChannelConnector();
_connector.setPort(port);
addConnector(_connector);
_handler = new WebSocketHandler()
{
@Override
protected WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
if ("org.ietf.websocket.test-echo".equals(protocol) || "echo".equals(protocol) || "lws-mirror-protocol".equals(protocol))
{
_websocket = new TestEchoWebSocket();
}
else if ("org.ietf.websocket.test-echo-broadcast".equals(protocol))
{
_websocket = new TestEchoBroadcastWebSocket();
}
else if ("org.ietf.websocket.test-echo-assemble".equals(protocol))
{
}
else if ("org.ietf.websocket.test-echo-fragment".equals(protocol))
{
}
else if ("org.ietf.websocket.test-consume".equals(protocol))
{
}
else if ("org.ietf.websocket.test-produce".equals(protocol))
{
}
else if (protocol==null)
{
_websocket = new TestWebSocket();
}
return _websocket;
}
};
setHandler(_handler);
}
public boolean isVerbose()
{
return _verbose;
}
public void setVerbose(boolean verbose)
{
_verbose = verbose;
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class TestWebSocket implements WebSocket, WebSocket.OnFrame, WebSocket.OnBinaryMessage, WebSocket.OnTextMessage, WebSocket.OnControl
{
protected Connection _connection;
public Connection getOutbound()
{
return _connection;
}
public void onConnect(Connection connection)
{
_connection = connection;
_webSockets.add(this);
}
public void onDisconnect(int code,String message)
{
_webSockets.remove(this);
}
public boolean onFrame(byte flags, byte opcode, byte[] data, int offset, int length)
{
if (_verbose)
System.err.printf("%s#onFrame %s|%s %s\n",this.getClass().getSimpleName(),TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),TypeUtil.toHexString(data,offset,length));
return false;
}
public boolean onControl(byte controlCode, byte[] data, int offset, int length)
{
if (_verbose)
System.err.printf("%s#onControl %s %s\n",this.getClass().getSimpleName(),TypeUtil.toHexString(controlCode),TypeUtil.toHexString(data,offset,length));
return false;
}
public void onMessage(String data)
{
if (_verbose)
System.err.printf("%s#onMessage %s\n",this.getClass().getSimpleName(),data);
}
public void onMessage(byte[] data, int offset, int length)
{
if (_verbose)
System.err.printf("%s#onMessage %s\n",this.getClass().getSimpleName(),TypeUtil.toHexString(data,offset,length));
}
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class TestEchoWebSocket extends TestWebSocket
{
@Override
public void onConnect(Connection connection)
{
super.onConnect(connection);
connection.setMaxTextMessageSize(-1);
connection.setMaxBinaryMessageSize(-1);
}
@Override
public boolean onFrame(byte flags, byte opcode, byte[] data, int offset, int length)
{
super.onFrame(flags,opcode,data,offset,length);
try
{
switch(opcode)
{
case WebSocketConnectionD06.OP_CLOSE:
case WebSocketConnectionD06.OP_PING:
case WebSocketConnectionD06.OP_PONG:
break;
default:
getOutbound().sendFrame(flags,opcode,data,offset,length);
}
}
catch (IOException e)
{
e.printStackTrace();
}
return false;
}
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class TestEchoBroadcastWebSocket extends TestWebSocket
{
@Override
public void onMessage(byte[] data, int offset, int length)
{
super.onMessage(data,offset,length);
for (TestWebSocket ws : _webSockets)
{
try
{
ws.getOutbound().sendMessage(data,offset,length);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
@Override
public void onMessage(final String data)
{
super.onMessage(data);
for (TestWebSocket ws : _webSockets)
{
try
{
ws.getOutbound().sendMessage(data);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class TestEchoAssembleWebSocket extends TestWebSocket
{
@Override
public void onConnect(Connection connection)
{
super.onConnect(connection);
connection.setMaxTextMessageSize(64*1024);
connection.setMaxBinaryMessageSize(64*1024);
}
@Override
public void onMessage(byte[] data, int offset, int length)
{
super.onMessage(data,offset,length);
try
{
getOutbound().sendMessage(data,offset,length);
}
catch (IOException e)
{
e.printStackTrace();
}
}
@Override
public void onMessage(final String data)
{
super.onMessage(data);
try
{
getOutbound().sendMessage(data);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
private static void usage()
{
System.err.println("java -cp CLASSPATH "+TestServer.class+" [ OPTIONS ]");
System.err.println(" -p|--port PORT ");
System.err.println(" -v|--verbose ");
System.exit(1);
}
public static void main(String[] args)
{
try
{
int port=8080;
boolean verbose=false;
for (int i=0;i<args.length;i++)
{
String a=args[i];
if ("-p".equals(a)||"--port".equals(a))
port=Integer.parseInt(args[++i]);
else if ("-v".equals(a)||"--verbose".equals(a))
verbose=true;
else if (a.startsWith("-"))
usage();
}
TestServer server = new TestServer(port);
server.setVerbose(verbose);
server.start();
server.join();
}
catch (Exception e)
{
Log.warn(e);
}
}
}

View File

@ -16,39 +16,55 @@ package org.eclipse.jetty.websocket;
import java.io.IOException;
public interface WebSocket
{
@Deprecated
public final byte LENGTH_FRAME=(byte)0x80;
@Deprecated
public final byte SENTINEL_FRAME=(byte)0x00;
{
void onConnect(Connection outbound);
void onDisconnect(int closeCode, String message);
public final static byte OP_CONTINUATION = 0x00;
public final static byte OP_CLOSE = 0x01;
public final static byte OP_PING = 0x02;
public final static byte OP_PONG = 0x03;
public final static byte OP_TEXT = 0x04;
public final static byte OP_BINARY = 0x05;
interface OnTextMessage extends WebSocket
{
void onMessage(String data);
}
public final static int CLOSE_NORMAL=1000;
public final static int CLOSE_SHUTDOWN=1001;
public final static int CLOSE_PROTOCOL=1002;
public final static int CLOSE_DATA=1003;
public final static int CLOSE_LARGE=1004;
interface OnBinaryMessage extends WebSocket
{
void onMessage(byte[] data, int offset, int length);
}
void onConnect(Outbound outbound);
void onMessage(byte opcode,String data);
void onFragment(boolean more,byte opcode,byte[] data, int offset, int length);
void onMessage(byte opcode,byte[] data, int offset, int length);
void onDisconnect(); // TODO add code
interface OnControl extends WebSocket
{
boolean onControl(byte controlCode,byte[] data, int offset, int length);
}
public interface Outbound
interface OnFrame extends WebSocket
{
boolean onFrame(byte flags,byte opcode,byte[] data, int offset, int length);
}
public interface Connection
{
void sendMessage(String data) throws IOException;
void sendMessage(byte opcode,String data) throws IOException;
void sendMessage(byte opcode,byte[] data, int offset, int length) throws IOException;
void sendFragment(boolean more,byte opcode,byte[] data, int offset, int length) throws IOException;
void disconnect();
void disconnect(int code,String message);
void sendMessage(byte[] data, int offset, int length) throws IOException;
void sendControl(byte control,byte[] data, int offset, int length) throws IOException;
void sendFrame(byte flags,byte opcode,byte[] data, int offset, int length) throws IOException;
void disconnect(int closeCode,String message);
boolean isOpen();
boolean isMore(byte flags);
void setMaxTextMessageSize(int size);
void setMaxBinaryMessageSize(int size);
/**
* Size in characters of the maximum text message to be received
* @return <0 No aggregation of frames to messages, >=0 max size of text frame aggregation buffer in characters
*/
int getMaxTextMessageSize();
/**
* Size in bytes of the maximum binary message to be received
* @return <0 no aggregation of binary frames, >=0 size of binary frame aggregation buffer
*/
int getMaxBinaryMessageSize();
}
}

View File

@ -28,10 +28,16 @@ import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
public class WebSocketConnectionD00 extends AbstractConnection implements WebSocketConnection, WebSocket.Outbound
public class WebSocketConnectionD00 extends AbstractConnection implements WebSocketConnection, WebSocket.Connection
{
public final static byte LENGTH_FRAME=(byte)0x80;
public final static byte SENTINEL_FRAME=(byte)0x00;
final IdleCheck _idle;
final WebSocketParser _parser;
final WebSocketGenerator _generator;
@ -63,7 +69,7 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
{
case 1:
_generator = new WebSocketGeneratorD01(buffers, _endp);
_parser = new WebSocketParserD01(buffers, endpoint, new FrameHandlerD1(this,_websocket));
_parser = new WebSocketParserD01(buffers, endpoint, new FrameHandlerD1(_websocket));
break;
default:
_generator = new WebSocketGeneratorD00(buffers, _endp);
@ -215,59 +221,60 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
public void closed()
{
_websocket.onDisconnect();
_websocket.onDisconnect(0,"");
}
/* ------------------------------------------------------------ */
/**
* {@inheritDoc}
*/
public void sendMessage(String content) throws IOException
{
sendMessage(WebSocket.SENTINEL_FRAME,content);
}
/* ------------------------------------------------------------ */
/**
* {@inheritDoc}
*/
public void sendMessage(byte frame, String content) throws IOException
{
_generator.addFrame(frame,content,_endp.getMaxIdleTime());
byte[] data = content.getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0,SENTINEL_FRAME,data,0,data.length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
/**
* {@inheritDoc}
*/
public void sendMessage(byte opcode, byte[] content, int offset, int length) throws IOException
public void sendMessage(byte[] data, int offset, int length) throws IOException
{
_generator.addFrame(opcode,content,offset,length,_endp.getMaxIdleTime());
_generator.addFrame((byte)0,LENGTH_FRAME,data,offset,length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
public boolean isMore(byte flags)
{
return (flags&0x8) != 0;
}
/* ------------------------------------------------------------ */
/**
* {@inheritDoc}
*/
public void sendFragment(boolean more,byte opcode, byte[] content, int offset, int length) throws IOException
public void sendControl(byte code, byte[] content, int offset, int length) throws IOException
{
_generator.addFragment(!more,opcode,content,offset,length,_endp.getMaxIdleTime());
}
/* ------------------------------------------------------------ */
public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
{
_generator.addFrame((byte)0,opcode,content,offset,length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
public void disconnect(int code, String message)
{
throw new UnsupportedOperationException();
}
/* ------------------------------------------------------------ */
public void disconnect()
{
try
@ -379,4 +386,148 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
}
}
public void setMaxTextMessageSize(int size)
{
}
public void setMaxBinaryMessageSize(int size)
{
}
public int getMaxTextMessageSize()
{
return -1;
}
public int getMaxBinaryMessageSize()
{
return -1;
}
class FrameHandlerD0 implements WebSocketParser.FrameHandler
{
final WebSocket _websocket;
final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
FrameHandlerD0(WebSocket websocket)
{
_websocket=websocket;
}
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
try
{
byte[] array=buffer.array();
if (opcode==0)
{
if (_websocket instanceof WebSocket.OnTextMessage)
((WebSocket.OnTextMessage)_websocket).onMessage(buffer.toString(StringUtil.__UTF8));
}
else
{
if (_websocket instanceof WebSocket.OnBinaryMessage)
((WebSocket.OnBinaryMessage)_websocket).onMessage(array,buffer.getIndex(),buffer.length());
}
}
catch(ThreadDeath th)
{
throw th;
}
catch(Throwable th)
{
Log.warn(th);
}
}
public void close(int code,String message)
{
disconnect(code,message);
}
}
class FrameHandlerD1 implements WebSocketParser.FrameHandler
{
public final static byte PING=1;
public final static byte PONG=1;
final WebSocket _websocket;
final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
boolean _fragmented=false;
FrameHandlerD1(WebSocket websocket)
{
_websocket=websocket;
}
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
try
{
byte[] array=buffer.array();
if (opcode==0)
{
if (isMore(flags))
{
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
_fragmented=true;
}
else if (_fragmented)
{
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
if (_websocket instanceof WebSocket.OnTextMessage)
((WebSocket.OnTextMessage)_websocket).onMessage(_utf8.toString());
_utf8.reset();
_fragmented=false;
}
else
{
if (_websocket instanceof WebSocket.OnTextMessage)
((WebSocket.OnTextMessage)_websocket).onMessage(buffer.toString(StringUtil.__UTF8));
}
}
else if (opcode==PING)
{
sendFrame(flags,PONG,buffer.array(),buffer.getIndex(),buffer.length());
}
else if (opcode==PONG)
{
}
else
{
if (isMore(flags))
{
if (_websocket instanceof WebSocket.OnFrame)
((WebSocket.OnFrame)_websocket).onFrame(flags,opcode,array,buffer.getIndex(),buffer.length());
}
else if (_fragmented)
{
if (_websocket instanceof WebSocket.OnFrame)
((WebSocket.OnFrame)_websocket).onFrame(flags,opcode,array,buffer.getIndex(),buffer.length());
}
else
{
if (_websocket instanceof WebSocket.OnBinaryMessage)
((WebSocket.OnBinaryMessage)_websocket).onMessage(array,buffer.getIndex(),buffer.length());
}
}
}
catch(ThreadDeath th)
{
throw th;
}
catch(Throwable th)
{
Log.warn(th);
}
}
public void close(int code,String message)
{
disconnect(code,message);
}
}
}

View File

@ -23,24 +23,67 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage;
import org.eclipse.jetty.websocket.WebSocket.OnControl;
public class WebSocketConnectionD06 extends AbstractConnection implements WebSocketConnection
{
public final static byte OP_CONTINUATION = 0x00;
public final static byte OP_CLOSE = 0x01;
public final static byte OP_PING = 0x02;
public final static byte OP_PONG = 0x03;
public final static byte OP_TEXT = 0x04;
public final static byte OP_BINARY = 0x05;
public final static int CLOSE_NORMAL=1000;
public final static int CLOSE_SHUTDOWN=1001;
public final static int CLOSE_PROTOCOL=1002;
public final static int CLOSE_BADDATA=1003;
public final static int CLOSE_LARGE=1004;
public static boolean isLastFrame(int flags)
{
return (flags&0x8)!=0;
}
public static boolean isControlFrame(int opcode)
{
switch(opcode)
{
case OP_CLOSE:
case OP_PING:
case OP_PONG:
return true;
default:
return false;
}
}
private final static byte[] MAGIC;
private final static byte[] NORMAL_CLOSE=new byte[] { 1000/0xff, (byte)(1000%0xff) };
private final IdleCheck _idle;
private final WebSocketParser _parser;
private final WebSocketGenerator _generator;
private final WebSocket _websocket;
private final WebSocket _webSocket;
private final OnFrame _onFrame;
private final OnBinaryMessage _onBinaryMessage;
private final OnTextMessage _onTextMessage;
private final OnControl _onControl;
private boolean _closedIn;
private boolean _closedOut;
private int _maxTextMessageSize;
private int _maxBinaryMessageSize=-1;
static
{
@ -57,10 +100,13 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
private final WebSocketParser.FrameHandler _frameHandler= new WebSocketParser.FrameHandler()
{
private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
private ByteArrayBuffer _aggregate;
private byte _opcode=-1;
public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer)
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
boolean more=(flags&0x8)==0;
synchronized(WebSocketConnectionD06.this)
{
// Ignore incoming after a close
@ -71,65 +117,89 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
{
byte[] array=buffer.array();
// Deliver frame if websocket is a FrameWebSocket
if (_onFrame!=null)
{
if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
return;
}
if (_onControl!=null && isControlFrame(opcode))
{
if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
return;
}
switch(opcode)
{
case WebSocket.OP_CONTINUATION:
case WebSocketConnectionD06.OP_CONTINUATION:
{
// If text, append to the message buffer
if (_opcode==WebSocket.OP_TEXT)
if (_opcode==WebSocketConnectionD06.OP_TEXT && _connection.getMaxTextMessageSize()>=0)
{
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
// If this is the last fragment, deliver the text buffer
if (!more)
if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
{
String msg =_utf8.toString();
// If this is the last fragment, deliver the text buffer
if (more && _onTextMessage!=null)
{
_opcode=-1;
String msg =_utf8.toString();
_utf8.reset();
_onTextMessage.onMessage(msg);
}
}
else
{
_connection.disconnect(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
_utf8.reset();
_opcode=-1;
_websocket.onMessage(WebSocket.OP_TEXT,msg);
}
}
else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
{
if (_aggregate.space()<_aggregate.length())
{
_connection.disconnect(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
_aggregate.clear();
_opcode=-1;
}
else
{
_aggregate.put(buffer);
// If this is the last fragment, deliver
if (!more && _onBinaryMessage!=null)
{
try
{
_onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
}
finally
{
_opcode=-1;
_aggregate.clear();
}
}
}
}
else
{
// deliver the non-text fragment
if (!more)
_opcode=-1;
_websocket.onFragment(more,_opcode,array,buffer.getIndex(),buffer.length());
}
break;
}
case WebSocket.OP_TEXT:
{
if (more)
{
// If this is a text fragment, append to buffer
_opcode=WebSocket.OP_TEXT;
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
}
else
{
// Deliver the message
_websocket.onMessage(opcode,buffer.toString(StringUtil.__UTF8));
}
break;
}
case WebSocket.OP_PING:
case WebSocketConnectionD06.OP_PING:
{
Log.debug("PING {}",this);
if (!_closedOut)
getOutbound().sendMessage(WebSocket.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
_connection.sendControl(WebSocketConnectionD06.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
break;
}
case WebSocket.OP_PONG:
case WebSocketConnectionD06.OP_PONG:
{
Log.debug("PONG {}",this);
break;
}
case WebSocket.OP_CLOSE:
case WebSocketConnectionD06.OP_CLOSE:
{
int code=-1;
String message=null;
@ -143,15 +213,64 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
break;
}
case WebSocketConnectionD06.OP_TEXT:
{
if(_onTextMessage!=null)
{
if (more)
{
if (_connection.getMaxTextMessageSize()>=0)
{
// If this is a text fragment, append to buffer
if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
_opcode=WebSocketConnectionD06.OP_TEXT;
else
{
_utf8.reset();
_opcode=-1;
_connection.disconnect(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
}
}
}
else
{
// Deliver the message
_onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
}
}
break;
}
default:
{
if (more)
if (_onBinaryMessage!=null)
{
_opcode=opcode;
_websocket.onFragment(more,opcode,array,buffer.getIndex(),buffer.length());
if (more)
{
if (_connection.getMaxBinaryMessageSize()>=0)
{
if (buffer.length()>_connection.getMaxBinaryMessageSize())
{
_connection.disconnect(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
if (_aggregate!=null)
_aggregate.clear();
_opcode=-1;
}
else
{
_opcode=opcode;
if (_aggregate==null)
_aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
_aggregate.put(buffer);
}
}
}
else
{
_onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
}
}
else
_websocket.onMessage(opcode,array,buffer.getIndex(),buffer.length());
}
}
}
@ -166,34 +285,36 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
}
}
public void close(int code,String message)
{
}
public String toString()
{
return WebSocketConnectionD06.this.toString()+"FH";
}
};
private final WebSocket.Outbound _outbound = new WebSocket.Outbound()
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private final WebSocket.Connection _connection = new WebSocket.Connection()
{
volatile boolean _disconnecting;
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(java.lang.String)
*/
public void sendMessage(String content) throws IOException
{
sendMessage(WebSocket.OP_TEXT,content);
}
int _maxTextMessage=WebSocketConnectionD06.this._maxTextMessageSize;
int _maxBinaryMessage=WebSocketConnectionD06.this._maxBinaryMessageSize;
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(byte, java.lang.String)
*/
public synchronized void sendMessage(byte opcode, String content) throws IOException
public synchronized void sendMessage(String content) throws IOException
{
if (_closedOut)
throw new IOException("closing");
_generator.addFrame(opcode,content,_endp.getMaxIdleTime());
byte[] data = content.getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,0,data.length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
@ -203,11 +324,11 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
/**
* @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(byte, byte[], int, int)
*/
public synchronized void sendMessage(byte opcode, byte[] content, int offset, int length) throws IOException
public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException
{
if (_closedOut)
throw new IOException("closing");
_generator.addFrame(opcode,content,offset,length,_endp.getMaxIdleTime());
_generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_BINARY,content,offset,length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
@ -215,18 +336,34 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.websocket.WebSocketConnection#sendFragment(boolean, byte, byte[], int, int)
* @see org.eclipse.jetty.websocket.WebSocketConnection#sendFrame(boolean, byte, byte[], int, int)
*/
public void sendFragment(boolean more,byte opcode, byte[] content, int offset, int length) throws IOException
public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
{
if (_closedOut)
throw new IOException("closing");
_generator.addFragment(!more,opcode,content,offset,length,_endp.getMaxIdleTime());
_generator.addFrame(flags,opcode,content,offset,length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
public void sendControl(byte control, byte[] data, int offset, int length) throws IOException
{
if (_closedOut)
throw new IOException("closing");
_generator.addFrame((byte)0x8,control,data,offset,length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
public boolean isMore(byte flags)
{
return (flags&0x8)==0;
}
/* ------------------------------------------------------------ */
public boolean isOpen()
@ -251,7 +388,31 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
_disconnecting=true;
WebSocketConnectionD06.this.closeOut(1000,null);
}
/* ------------------------------------------------------------ */
public void setMaxTextMessageSize(int size)
{
_maxTextMessage=size;
}
/* ------------------------------------------------------------ */
public void setMaxBinaryMessageSize(int size)
{
_maxBinaryMessage=size;
}
/* ------------------------------------------------------------ */
public int getMaxTextMessageSize()
{
return _maxTextMessage;
}
/* ------------------------------------------------------------ */
public int getMaxBinaryMessageSize()
{
return _maxBinaryMessage;
}
};
/* ------------------------------------------------------------ */
@ -273,7 +434,11 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
_endp.setMaxIdleTime(maxIdleTime);
_websocket = websocket;
_webSocket = websocket;
_onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null;
_onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null;
_onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null;
_onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null;
_generator = new WebSocketGeneratorD06(buffers, _endp,null);
_parser = new WebSocketParserD06(buffers, endpoint, _frameHandler,true);
@ -299,12 +464,15 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
{}
};
}
_maxTextMessageSize=buffers.getBufferSize();
_maxBinaryMessageSize=-1;
}
/* ------------------------------------------------------------ */
public WebSocket.Outbound getOutbound()
public WebSocket.Connection getConnection()
{
return _outbound;
return _connection;
}
/* ------------------------------------------------------------ */
@ -366,7 +534,7 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
@Override
public void idleExpired()
{
closeOut(WebSocket.CLOSE_NORMAL,"Idle");
closeOut(WebSocketConnectionD06.CLOSE_NORMAL,"Idle");
}
/* ------------------------------------------------------------ */
@ -378,7 +546,7 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
/* ------------------------------------------------------------ */
public void closed()
{
_websocket.onDisconnect();
_webSocket.onDisconnect(WebSocketConnectionD06.CLOSE_NORMAL,"");
}
/* ------------------------------------------------------------ */
@ -410,16 +578,14 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
{
if (_closedIn || _closedOut)
_endp.close();
else if (code<=0)
{
_generator.addFrame(WebSocket.OP_CLOSE,NORMAL_CLOSE,0,2,_endp.getMaxIdleTime());
}
else
else
{
if (code<=0)
code=WebSocketConnectionD06.CLOSE_NORMAL;
byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
bytes[0]=(byte)(code/0xff);
bytes[1]=(byte)(code%0xff);
_generator.addFrame(WebSocket.OP_CLOSE,bytes,0,bytes.length,_endp.getMaxIdleTime());
bytes[0]=(byte)(code/0x100);
bytes[1]=(byte)(code%0x100);
_generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_CLOSE,bytes,0,bytes.length,_endp.getMaxIdleTime());
}
_generator.flush();
@ -433,7 +599,6 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
_closedOut=true;
}
}
/* ------------------------------------------------------------ */
public void fillBuffersFrom(Buffer buffer)
@ -441,7 +606,6 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
_parser.fill(buffer);
}
/* ------------------------------------------------------------ */
private void checkWriteable()
{
@ -471,7 +635,7 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
response.addHeader("Sec-WebSocket-Protocol",subprotocol);
response.sendError(101);
_websocket.onConnect(_outbound);
_webSocket.onConnect(_connection);
}
/* ------------------------------------------------------------ */
@ -489,6 +653,4 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
throw new RuntimeException(e);
}
}
}

View File

@ -133,4 +133,17 @@ public class WebSocketFactory
// Tell jetty about the new connection
request.setAttribute("org.eclipse.jetty.io.Connection",connection);
}
public static String[] parseProtocols(String protocol)
{
if (protocol==null)
return new String[]{null};
protocol=protocol.trim();
if (protocol==null || protocol.length()==0)
return new String[]{null};
String[] passed = protocol.split("\\s*,\\s*");
String[] protocols = new String[passed.length+1];
System.arraycopy(passed,0,protocols,0,passed.length);
return protocols;
}
}

View File

@ -24,8 +24,6 @@ public interface WebSocketGenerator
{
int flush() throws IOException;
boolean isBufferEmpty();
void addFrame(byte opcode, String content, int maxIdleTime) throws IOException;
void addFrame(byte opcode, byte[] content, int offset, int length, int maxIdleTime) throws IOException;
void addFragment(boolean last, byte opcode,byte[] content, int offset, int length, int maxIdleTime) throws IOException;
void addFrame(byte flags,byte opcode, byte[] content, int offset, int length, int maxIdleTime) throws IOException;
int flush(int maxIdleTime) throws IOException;
}

View File

@ -40,7 +40,7 @@ public class WebSocketGeneratorD00 implements WebSocketGenerator
_endp=endp;
}
public synchronized void addFrame(byte opcode,byte[] content, int offset, int length, int blockFor) throws IOException
public synchronized void addFrame(byte flags, byte opcode,byte[] content, int offset, int length, int blockFor) throws IOException
{
if (_buffer==null)
_buffer=_buffers.getDirectBuffer();
@ -98,7 +98,7 @@ public class WebSocketGeneratorD00 implements WebSocketGenerator
private synchronized boolean isLengthFrame(byte frame)
{
return (frame & WebSocket.LENGTH_FRAME) == WebSocket.LENGTH_FRAME;
return (frame & WebSocketConnectionD00.LENGTH_FRAME) == WebSocketConnectionD00.LENGTH_FRAME;
}
private synchronized void bufferPut(byte datum, long blockFor) throws IOException
@ -110,12 +110,6 @@ public class WebSocketGeneratorD00 implements WebSocketGenerator
expelBuffer(blockFor);
}
public synchronized void addFrame(byte frame, String content, int blockFor) throws IOException
{
byte[] bytes = content.getBytes("UTF-8");
addFrame(frame, bytes, 0, bytes.length, blockFor);
}
public synchronized int flush(int blockFor) throws IOException
{
return expelBuffer(blockFor);
@ -170,12 +164,5 @@ public class WebSocketGeneratorD00 implements WebSocketGenerator
{
return _buffer==null || _buffer.length()==0;
}
public void addFragment(boolean last,byte opcode, byte[] content, int offset, int length, int maxIdleTime) throws IOException
{
if (!last)
throw new UnsupportedOperationException("fragmented");
addFrame(opcode,content,offset,length,maxIdleTime);
}
}

View File

@ -38,19 +38,8 @@ public class WebSocketGeneratorD01 implements WebSocketGenerator
_buffers=buffers;
_endp=endp;
}
public synchronized void addFrame(byte opcode,byte[] content, int blockFor) throws IOException
{
addFrame(opcode,content,0,content.length,blockFor);
}
public synchronized void addFrame(byte opcode,byte[] content, int offset, int length, int blockFor) throws IOException
{
addFragment(true,opcode,content,offset,length,blockFor);
}
public synchronized void addFragment(boolean last,byte opcode, byte[] content, int offset, int length, int blockFor) throws IOException
public synchronized void addFrame(byte flags,byte opcode, byte[] content, int offset, int length, int blockFor) throws IOException
{
if (_buffer==null)
_buffer=_buffers.getDirectBuffer();
@ -69,7 +58,7 @@ public class WebSocketGeneratorD01 implements WebSocketGenerator
fragment=_buffer.capacity()-10;
bufferPut((byte)(0x80|opcode), blockFor);
}
else if (last)
else if ((flags&0x8)==0)
bufferPut(opcode, blockFor);
else
bufferPut((byte)(0x80|opcode), blockFor);
@ -134,12 +123,6 @@ public class WebSocketGeneratorD01 implements WebSocketGenerator
expelBuffer(blockFor);
}
public synchronized void addFrame(byte frame, String content, int blockFor) throws IOException
{
byte[] bytes = content.getBytes("UTF-8");
addFrame(frame, bytes, 0, bytes.length, blockFor);
}
public synchronized int flush(int blockFor) throws IOException
{
return expelBuffer(blockFor);

View File

@ -20,6 +20,7 @@ import java.util.Random;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.TypeUtil;
/* ------------------------------------------------------------ */
@ -108,34 +109,30 @@ public class WebSocketGeneratorD06 implements WebSocketGenerator
_maskGen=maskGen;
}
public synchronized void addFrame(byte opcode,byte[] content, int blockFor) throws IOException
{
_opsent=false;
addFrame(opcode,content,0,content.length,blockFor);
}
public synchronized void addFrame(byte opcode,byte[] content, int offset, int length, int blockFor) throws IOException
{
_opsent=false;
addFragment(true,opcode,content,offset,length,blockFor);
}
public synchronized void addFragment(boolean last, byte opcode, byte[] content, int offset, int length, int blockFor) throws IOException
public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length, int blockFor) throws IOException
{
// System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length);
if (_buffer==null)
_buffer=(_maskGen!=null)?_buffers.getBuffer():_buffers.getDirectBuffer();
boolean last=WebSocketConnectionD06.isLastFrame(flags);
opcode=(byte)(((0xf&flags)<<4)+0xf&opcode);
int space=(_maskGen!=null)?14:10;
do
{
opcode = _opsent?WebSocket.OP_CONTINUATION:(byte)(opcode & 0x0f);
opcode = _opsent?WebSocketConnectionD06.OP_CONTINUATION:opcode;
_opsent=true;
int payload=length;
if (payload+space>_buffer.capacity())
{
// We must fragement, so clear FIN bit
opcode&=(byte)0x7F; // Clear the FIN bit
payload=_buffer.capacity()-space;
}
else if (last)
opcode|=(byte)0x80; // Set the FIN bit
@ -150,7 +147,7 @@ public class WebSocketGeneratorD06 implements WebSocketGenerator
_m=0;
_buffer.put(_mask);
}
// write the opcode and length
if (payload>0xffff)
{
@ -232,12 +229,6 @@ public class WebSocketGeneratorD06 implements WebSocketGenerator
_buffer.put((byte)(data^_mask[+_m++%4]));
}
public synchronized void addFrame(byte frame, String content, int blockFor) throws IOException
{
byte[] bytes = content.getBytes("UTF-8");
addFrame(frame, bytes, 0, bytes.length, blockFor);
}
public synchronized int flush(int blockFor) throws IOException
{
return expelBuffer(blockFor);

View File

@ -24,7 +24,7 @@ import org.eclipse.jetty.server.handler.HandlerWrapper;
public abstract class WebSocketHandler extends HandlerWrapper
{
private WebSocketFactory _websocket;
private WebSocketFactory _webSocketFactory;
private int _bufferSize=8192;
private int _maxIdleTime=-1;
@ -53,7 +53,7 @@ public abstract class WebSocketHandler extends HandlerWrapper
*/
public int getMaxIdleTime()
{
return (int)(_websocket==null?_maxIdleTime:_websocket.getMaxIdleTime());
return (int)(_webSocketFactory==null?_maxIdleTime:_webSocketFactory.getMaxIdleTime());
}
/* ------------------------------------------------------------ */
@ -63,8 +63,8 @@ public abstract class WebSocketHandler extends HandlerWrapper
public void setMaxIdleTime(int maxIdleTime)
{
_maxIdleTime = maxIdleTime;
if (_websocket!=null)
_websocket.setMaxIdleTime(maxIdleTime);
if (_webSocketFactory!=null)
_webSocketFactory.setMaxIdleTime(maxIdleTime);
}
/* ------------------------------------------------------------ */
@ -74,9 +74,9 @@ public abstract class WebSocketHandler extends HandlerWrapper
@Override
protected void doStart() throws Exception
{
_websocket=new WebSocketFactory(_bufferSize);
_webSocketFactory=new WebSocketFactory(_bufferSize);
if (_maxIdleTime>=0)
_websocket.setMaxIdleTime(_maxIdleTime);
_webSocketFactory.setMaxIdleTime(_maxIdleTime);
super.doStart();
}
@ -88,7 +88,7 @@ public abstract class WebSocketHandler extends HandlerWrapper
protected void doStop() throws Exception
{
super.doStop();
_websocket=null;
_webSocketFactory=null;
}
/* ------------------------------------------------------------ */
@ -97,17 +97,27 @@ public abstract class WebSocketHandler extends HandlerWrapper
{
if ("websocket".equalsIgnoreCase(request.getHeader("Upgrade")))
{
String subprotocol=request.getHeader("Sec-WebSocket-Protocol");
if (subprotocol==null) // TODO remove once draft period is over
subprotocol=request.getHeader("WebSocket-Protocol");
WebSocket websocket=doWebSocketConnect(request,subprotocol);
String protocol=request.getHeader("Sec-WebSocket-Protocol");
if (protocol==null) // TODO remove once draft period is over
protocol=request.getHeader("WebSocket-Protocol");
WebSocket websocket=null;
for (String p :WebSocketFactory.parseProtocols(protocol))
{
websocket=doWebSocketConnect(request,p);
if (websocket!=null)
{
protocol=p;
break;
}
}
String host=request.getHeader("Host");
String origin=request.getHeader("Origin");
origin=checkOrigin(request,host,origin);
if (websocket!=null)
_websocket.upgrade(request,response,websocket,origin,subprotocol);
_webSocketFactory.upgrade(request,response,websocket,origin,protocol);
else
response.sendError(503);
}

View File

@ -30,7 +30,8 @@ public interface WebSocketParser
/* ------------------------------------------------------------ */
public interface FrameHandler
{
void onFrame(boolean more,byte flags, byte opcode, Buffer buffer);
void onFrame(byte flags, byte opcode, Buffer buffer);
void close(int code,String message);
}
Buffer getBuffer();

View File

@ -145,7 +145,7 @@ public class WebSocketParserD00 implements WebSocketParser
{
_state=STATE_START;
int l=_buffer.getIndex()-_buffer.markIndex()-1;
_handler.onFrame(false,(byte)0,_opcode,_buffer.sliceFromMark(l));
_handler.onFrame((byte)0,_opcode,_buffer.sliceFromMark(l));
_buffer.setMarkIndex(-1);
if (_buffer.length()==0)
{
@ -173,7 +173,7 @@ public class WebSocketParserD00 implements WebSocketParser
Buffer data=_buffer.sliceFromMark(_length);
_buffer.skip(_length);
_state=STATE_START;
_handler.onFrame(false,(byte)0, _opcode, data);
_handler.onFrame((byte)0, _opcode, data);
if (_buffer.length()==0)
{

View File

@ -52,7 +52,6 @@ public class WebSocketParserD01 implements WebSocketParser
private final FrameHandler _handler;
private State _state=State.START;
private Buffer _buffer;
private boolean _more;
private byte _flags;
private byte _opcode;
private int _count;
@ -142,7 +141,6 @@ public class WebSocketParserD01 implements WebSocketParser
b=_buffer.get();
_opcode=(byte)(b&0xf);
_flags=(byte)(b>>4);
_more=(_flags&8)!=0;
_state=State.LENGTH_7;
continue;
@ -195,7 +193,7 @@ public class WebSocketParserD01 implements WebSocketParser
if (_state==State.DATA && available>=_count)
{
_handler.onFrame(_more,_flags, _opcode, _buffer.get(_count));
_handler.onFrame(_flags, _opcode, _buffer.get(_count));
_count=0;
_state=State.START;

View File

@ -32,18 +32,19 @@ import org.eclipse.jetty.util.log.Log;
public class WebSocketParserD06 implements WebSocketParser
{
public enum State {
MASK(0), OPCODE(1), LENGTH_7(2), LENGTH_16(4), LENGTH_63(10), DATA(10);
START(0), MASK(4), OPCODE(1), LENGTH_7(1), LENGTH_16(2), LENGTH_63(8), DATA(0), SKIP(1);
int _minSize;
int _needs;
State(int minSize)
State(int needs)
{
_minSize=minSize;
_needs=needs;
}
int getMinSize()
int getNeeds()
{
return _minSize;
return _needs;
}
};
@ -54,12 +55,10 @@ public class WebSocketParserD06 implements WebSocketParser
private final boolean _masked;
private State _state;
private Buffer _buffer;
private boolean _fin;
private byte _flags;
private byte _opcode;
private int _count;
private int _bytesNeeded;
private long _length;
private Utf8StringBuilder _utf8;
private final byte[] _mask = new byte[4];
private int _m;
@ -77,7 +76,7 @@ public class WebSocketParserD06 implements WebSocketParser
_endp=endp;
_handler=handler;
_masked=masked;
_state=_masked?State.MASK:State.OPCODE;
_state=State.START;
}
/* ------------------------------------------------------------ */
@ -113,21 +112,21 @@ public class WebSocketParserD06 implements WebSocketParser
int available=_buffer.length();
// Fill buffer if we need a byte or need length
if (available < (_state.getMinSize() + (_masked?4:0)) || _state==State.DATA && available<_count)
while (available<(_state==State.SKIP?1:_bytesNeeded))
{
// compact to mark (set at start of data)
_buffer.compact();
// if no space, then the data is too big for buffer
if (_buffer.space() == 0)
throw new IllegalStateException("FULL");
throw new IllegalStateException("FULL: "+_state+" "+_bytesNeeded+">"+_buffer.capacity());
// catch IOExceptions (probably EOF) and try to parse what we have
try
{
int filled=_endp.isOpen()?_endp.fill(_buffer):-1;
if (filled<=0)
return total_filled;
return total_filled>0?total_filled:filled;
total_filled+=filled;
available=_buffer.length();
}
@ -139,85 +138,125 @@ public class WebSocketParserD06 implements WebSocketParser
}
// if we are here, then we have sufficient bytes to process the current state.
// Parse the buffer byte by byte (unless it is STATE_DATA)
byte b;
while (_state!=State.DATA && available-->0)
while (_state!=State.DATA && available>=(_state==State.SKIP?1:_bytesNeeded))
{
switch (_state)
{
case START:
_state=_masked?State.MASK:State.OPCODE;
_bytesNeeded=_state.getNeeds();
continue;
case MASK:
_buffer.get(_mask,0,4);
available-=4;
_state=State.OPCODE;
_bytesNeeded=_state.getNeeds();
_m=0;
continue;
case OPCODE:
b=_buffer.get();
available--;
if (_masked)
b^=_mask[_m++%4];
_opcode=(byte)(b&0xf);
_flags=(byte)(b>>4);
_fin=(_flags&8)!=0;
_state=State.LENGTH_7;
_flags=(byte)(0xf&(b>>4));
if (WebSocketConnectionD06.isControlFrame(_opcode)&&!WebSocketConnectionD06.isLastFrame(_flags))
{
_state=State.SKIP;
_handler.close(WebSocketConnectionD06.CLOSE_PROTOCOL,"fragmented control");
}
else
_state=State.LENGTH_7;
_bytesNeeded=_state.getNeeds();
continue;
case LENGTH_7:
b=_buffer.get();
available--;
if (_masked)
b^=_mask[_m++%4];
switch(b)
{
case 127:
_length=0;
_count=8;
_state=State.LENGTH_63;
_bytesNeeded=_state.getNeeds();
break;
case 126:
_length=0;
_count=2;
_state=State.LENGTH_16;
_bytesNeeded=_state.getNeeds();
break;
default:
_length=(0x7f&b);
_count=(int)_length;
_bytesNeeded=(int)_length;
_state=State.DATA;
}
continue;
case LENGTH_16:
b=_buffer.get();
available--;
if (_masked)
b^=_mask[_m++%4];
_length = _length<<8 | b;
if (--_count==0)
_length = _length*0x100 + (0xff&b);
if (--_bytesNeeded==0)
{
if (_length>=_buffer.capacity()-4)
throw new IllegalStateException("TOO LARGE");
_count=(int)_length;
_state=State.DATA;
_bytesNeeded=(int)_length;
if (_length>_buffer.capacity())
{
_state=State.SKIP;
_handler.close(WebSocketConnectionD06.CLOSE_LARGE,"frame size "+_length+">"+_buffer.capacity());
}
else
{
_state=State.DATA;
}
}
continue;
case LENGTH_63:
b=_buffer.get();
available--;
if (_masked)
b^=_mask[_m++%4];
_length = _length<<8 | b;
if (--_count==0)
_length = _length*0x100 + (0xff&b);
if (--_bytesNeeded==0)
{
if (_length>=_buffer.capacity()-10)
throw new IllegalStateException("TOO LARGE");
_count=(int)_length;
_state=State.DATA;
_bytesNeeded=(int)_length;
if (_length>=_buffer.capacity())
{
_state=State.SKIP;
_handler.close(WebSocketConnectionD06.CLOSE_LARGE,"frame size "+_length+">"+_buffer.capacity());
}
else
{
_state=State.DATA;
}
}
continue;
case SKIP:
int skip=Math.min(available,_bytesNeeded);
_buffer.skip(skip);
available-=skip;
_bytesNeeded-=skip;
if (_bytesNeeded==0)
_state=State.START;
}
}
if (_state==State.DATA && available>=_count)
if (_state==State.DATA && available>=_bytesNeeded)
{
Buffer data =_buffer.get(_count);
Buffer data =_buffer.get(_bytesNeeded);
if (_masked)
{
if (data.array()==null)
@ -227,10 +266,11 @@ public class WebSocketParserD06 implements WebSocketParser
for (int i=data.getIndex();i<end;i++)
array[i]^=_mask[_m++%4];
}
_handler.onFrame(!_fin,_flags, _opcode, data);
_count=0;
_state=_masked?State.MASK:State.OPCODE;
// System.err.printf("%s %s %s >>\n",TypeUtil.toHexString(_flags),TypeUtil.toHexString(_opcode),data.length());
_handler.onFrame(_flags, _opcode, data);
_bytesNeeded=0;
_state=State.START;
if (_buffer.length()==0)
{

View File

@ -67,7 +67,17 @@ public abstract class WebSocketServlet extends HttpServlet
String protocol=request.getHeader("Sec-WebSocket-Protocol");
if (protocol==null) // TODO remove once draft period is over
protocol=request.getHeader("WebSocket-Protocol");
WebSocket websocket=doWebSocketConnect(request,protocol);
WebSocket websocket=null;
for (String p :WebSocketFactory.parseProtocols(protocol))
{
websocket=doWebSocketConnect(request,p);
if (websocket!=null)
{
protocol=p;
break;
}
}
String host=request.getHeader("Host");
String origin=request.getHeader("Origin");

View File

@ -4,11 +4,12 @@ import static junit.framework.Assert.assertEquals;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.util.StringUtil;
import org.junit.Before;
import org.junit.Test;
/**
* @version $Revision: 1441 $ $Date: 2010-04-02 12:28:17 +0200 (Fri, 02 Apr 2010) $
* @version $Revision$ $Date$
*/
public class WebSocketGeneratorD00Test
{
@ -28,7 +29,8 @@ public class WebSocketGeneratorD00Test
@Test
public void testOneString() throws Exception
{
_generator.addFrame((byte)0x04,"Hell\uFF4F W\uFF4Frld",0);
byte[] data="Hell\uFF4F W\uFF4Frld".getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0x0,(byte)0x04,data,0,data.length,0);
_generator.flush();
assertEquals(4,_out.get());
assertEquals(15,_out.get());
@ -56,7 +58,7 @@ public class WebSocketGeneratorD00Test
for (int i=0;i<b.length;i++)
b[i]=(byte)('0'+(i%10));
_generator.addFrame((byte)0xf,b,0,b.length,0);
_generator.addFrame((byte)0x0,(byte)0xf,b,0,b.length,0);
_generator.flush();
assertEquals(0x0f,_out.get());
@ -74,7 +76,7 @@ public class WebSocketGeneratorD00Test
for (int i=0;i<b.length;i++)
b[i]=(byte)('0'+(i%10));
_generator.addFrame((byte)0xf,b,0,b.length,0);
_generator.addFrame((byte)0x0,(byte)0xf,b,0,b.length,0);
_generator.flush();
assertEquals(0x8f,_out.get()&0xff);

View File

@ -9,7 +9,7 @@ import org.junit.Before;
import org.junit.Test;
/**
* @version $Revision: 1441 $ $Date: 2010-04-02 12:28:17 +0200 (Fri, 02 Apr 2010) $
* @version $Revision$ $Date$
*/
public class WebSocketGeneratorD01Test
{
@ -29,7 +29,8 @@ public class WebSocketGeneratorD01Test
@Test
public void testOneString() throws Exception
{
_generator.addFrame((byte)0x04,"Hell\uFF4F W\uFF4Frld",0);
byte[] data = "Hell\uFF4F W\uFF4Frld".getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0,(byte)0x04,data,0,data.length,0);
_generator.flush();
assertEquals(4,_out.get());
assertEquals('H',_out.get());
@ -55,7 +56,7 @@ public class WebSocketGeneratorD01Test
{
String string = "Hell\uFF4F W\uFF4Frld";
byte[] bytes=string.getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0x84,bytes,0,bytes.length,0);
_generator.addFrame((byte)0,(byte)0x84,bytes,0,bytes.length,0);
_generator.flush();
assertEquals(0x84,0xff&_out.get());
assertEquals(15,0xff&_out.get());
@ -83,7 +84,7 @@ public class WebSocketGeneratorD01Test
for (int i=0;i<b.length;i++)
b[i]=(byte)('0'+(i%10));
_generator.addFrame((byte)0x85,b,0,b.length,0);
_generator.addFrame((byte)0,(byte)0x85,b,0,b.length,0);
_generator.flush();
assertEquals(0x85,0xff&_out.get());

View File

@ -43,9 +43,9 @@ public class WebSocketGeneratorD06Test
public void testOneString() throws Exception
{
_generator = new WebSocketGeneratorD06(_buffers, _endPoint,null);
_generator.addFrame((byte)0x04,"Hell\uFF4F W\uFF4Frld",0);
byte[] data = "Hell\uFF4F W\uFF4Frld".getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0x8,(byte)0x04,data,0,data.length,0);
_generator.flush();
assertEquals((byte)0x84,_out.get());
assertEquals(15,0xff&_out.get());
@ -73,7 +73,7 @@ public class WebSocketGeneratorD06Test
String string = "Hell\uFF4F W\uFF4Frld";
byte[] bytes=string.getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0x04,bytes,0,bytes.length,0);
_generator.addFrame((byte)0x8,(byte)0x04,bytes,0,bytes.length,0);
_generator.flush();
assertEquals((byte)0x84,_out.get());
assertEquals(15,0xff&_out.get());
@ -103,7 +103,7 @@ public class WebSocketGeneratorD06Test
for (int i=0;i<b.length;i++)
b[i]=(byte)('0'+(i%10));
_generator.addFrame((byte)0x4,b,0,b.length,0);
_generator.addFrame((byte)0x8,(byte)0x4,b,0,b.length,0);
_generator.flush();
assertEquals((byte)0x84,_out.get());
@ -119,8 +119,9 @@ public class WebSocketGeneratorD06Test
public void testOneStringMasked() throws Exception
{
_generator = new WebSocketGeneratorD06(_buffers, _endPoint,_maskGen);
_generator.addFrame((byte)0x04,"Hell\uFF4F W\uFF4Frld",0);
byte[] data = "Hell\uFF4F W\uFF4Frld".getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0x8,(byte)0x04,data,0,data.length,0);
_generator.flush();
_out.get(_mask,0,4);
@ -151,7 +152,7 @@ public class WebSocketGeneratorD06Test
String string = "Hell\uFF4F W\uFF4Frld";
byte[] bytes=string.getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0x04,bytes,0,bytes.length,0);
_generator.addFrame((byte)0x8,(byte)0x04,bytes,0,bytes.length,0);
_generator.flush();
_out.get(_mask,0,4);
@ -184,7 +185,7 @@ public class WebSocketGeneratorD06Test
for (int i=0;i<b.length;i++)
b[i]=(byte)('0'+(i%10));
_generator.addFrame((byte)0x04,b,0,b.length,0);
_generator.addFrame((byte)0x8,(byte)0x04,b,0,b.length,0);
_generator.flush();
_out.get(_mask,0,4);

View File

@ -24,6 +24,7 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.thread.QueuedThreadPool;
import org.junit.AfterClass;
import org.junit.BeforeClass;
@ -87,14 +88,14 @@ public class WebSocketLoadTest
clients[i].open();
}
long start = System.nanoTime();
//long start = System.nanoTime();
for (WebSocketClient client : clients)
threadPool.execute(client);
int parallelism = ManagementFactory.getOperatingSystemMXBean().getAvailableProcessors();
long maxTimePerIteration = 5;
assertTrue(latch.await(iterations * (count / parallelism + 1) * maxTimePerIteration, TimeUnit.MILLISECONDS));
long end = System.nanoTime();
//long end = System.nanoTime();
// System.err.println("Elapsed: " + TimeUnit.NANOSECONDS.toMillis(end - start) + " ms");
for (WebSocketClient client : clients)
@ -107,16 +108,16 @@ public class WebSocketLoadTest
}
}
private static class EchoWebSocket implements WebSocket
private static class EchoWebSocket implements WebSocket.OnTextMessage
{
private volatile Outbound outbound;
private volatile Connection outbound;
public void onConnect(Outbound outbound)
public void onConnect(Connection outbound)
{
this.outbound = outbound;
}
public void onMessage(byte frame, String data)
public void onMessage(String data)
{
try
{
@ -125,19 +126,11 @@ public class WebSocketLoadTest
}
catch (IOException x)
{
outbound.disconnect();
outbound.disconnect(0,"");
}
}
public void onFragment(boolean more, byte opcode, byte[] data, int offset, int length)
{
}
public void onMessage(byte frame, byte[] data, int offset, int length)
{
}
public void onDisconnect()
public void onDisconnect(int closeCode, String message)
{
}
}
@ -154,10 +147,14 @@ public class WebSocketLoadTest
private final WebSocketParserD06 _parser;
private final WebSocketParser.FrameHandler _handler = new WebSocketParser.FrameHandler()
{
public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer)
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
_response=buffer;
}
public void close(int code,String message)
{
}
};
private volatile Buffer _response;
@ -205,7 +202,8 @@ public class WebSocketLoadTest
String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF";
for (int i = 0; i < iterations; ++i)
{
_generator.addFrame(WebSocket.OP_TEXT,message,10000);
byte[] data = message.getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,0,data.length,10000);
_generator.flush(10000);
//System.err.println("-> "+message);

View File

@ -227,9 +227,9 @@ public class WebSocketMessageD00Test
{
boolean onConnect=false;
private final CountDownLatch latch = new CountDownLatch(1);
private volatile Outbound outbound;
private volatile Connection outbound;
public void onConnect(Outbound outbound)
public void onConnect(Connection outbound)
{
this.outbound = outbound;
if (onConnect)
@ -251,19 +251,7 @@ public class WebSocketMessageD00Test
return latch.await(time, TimeUnit.MILLISECONDS);
}
public void onMessage(byte frame, String data)
{
}
public void onMessage(byte frame, byte[] data, int offset, int length)
{
}
public void onDisconnect()
{
}
public void onFragment(boolean more, byte opcode, byte[] data, int offset, int length)
public void onDisconnect(int code,String message)
{
}
}

View File

@ -283,9 +283,9 @@ public class WebSocketMessageD01Test
boolean onConnect=false;
private final CountDownLatch connected = new CountDownLatch(1);
private final CountDownLatch disconnected = new CountDownLatch(1);
private volatile Outbound outbound;
private volatile Connection outbound;
public void onConnect(Outbound outbound)
public void onConnect(Connection outbound)
{
this.outbound = outbound;
if (onConnect)
@ -312,21 +312,10 @@ public class WebSocketMessageD01Test
return disconnected.await(time, TimeUnit.MILLISECONDS);
}
public void onMessage(byte frame, String data)
{
}
public void onMessage(byte frame, byte[] data, int offset, int length)
{
}
public void onDisconnect()
public void onDisconnect(int code,String message)
{
disconnected.countDown();
}
public void onFragment(boolean more, byte opcode, byte[] data, int offset, int length)
{
}
}
}

View File

@ -51,6 +51,7 @@ public class WebSocketMessageD06Test
_serverWebSocket = new TestWebSocket();
_serverWebSocket.onConnect=("onConnect".equals(protocol));
_serverWebSocket.echo=("echo".equals(protocol));
_serverWebSocket.aggregate=("aggregate".equals(protocol));
return _serverWebSocket;
}
};
@ -102,7 +103,7 @@ public class WebSocketMessageD06Test
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.outbound);
assertNotNull(_serverWebSocket.connection);
// Server sends a big message
StringBuilder message = new StringBuilder();
@ -110,9 +111,9 @@ public class WebSocketMessageD06Test
for (int i = 0; i < (0x2000) / text.length(); i++)
message.append(text);
String data=message.toString();
_serverWebSocket.outbound.sendMessage(data);
_serverWebSocket.connection.sendMessage(data);
assertEquals(WebSocket.OP_TEXT,input.read());
assertEquals(WebSocketConnectionD06.OP_TEXT,input.read());
assertEquals(0x7e,input.read());
assertEquals(0x1f,input.read());
assertEquals(0xf6,input.read());
@ -150,7 +151,7 @@ public class WebSocketMessageD06Test
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.outbound);
assertNotNull(_serverWebSocket.connection);
assertEquals(0x84,input.read());
assertEquals(0x0f,input.read());
@ -194,7 +195,7 @@ public class WebSocketMessageD06Test
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.outbound);
assertNotNull(_serverWebSocket.connection);
assertEquals(0x84,input.read());
assertEquals(0x0f,input.read());
@ -234,12 +235,282 @@ public class WebSocketMessageD06Test
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.outbound);
assertNotNull(_serverWebSocket.connection);
socket.setSoTimeout(1000);
assertEquals(0x83,input.read());
assertEquals(0x00,input.read());
}
@Test
public void testMaxTextSize() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: other\r\n" +
"Sec-WebSocket-Version: 6\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
socket.setSoTimeout(1000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection);
_serverWebSocket.getConnection().setMaxTextMessageSize(15);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0x04^0xff);
output.write(0x0a^0xff);
byte[] bytes="0123456789".getBytes(StringUtil.__ISO_8859_1);
for (int i=0;i<bytes.length;i++)
output.write(bytes[i]^0xff);
output.flush();
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0x80^0xff);
output.write(0x0a^0xff);
for (int i=0;i<bytes.length;i++)
output.write(bytes[i]^0xff);
output.flush();
assertEquals(0x80|WebSocketConnectionD06.OP_CLOSE,input.read());
assertEquals(30,input.read());
int code=(0xff&input.read())*0x100+(0xff&input.read());
assertEquals(1004,code);
lookFor("Text message size > 15 chars",input);
}
@Test
public void testMaxTextSize2() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: other\r\n" +
"Sec-WebSocket-Version: 6\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
socket.setSoTimeout(100000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection);
_serverWebSocket.getConnection().setMaxTextMessageSize(15);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0x04^0xff);
output.write(0x14^0xff);
byte[] bytes="01234567890123456789".getBytes(StringUtil.__ISO_8859_1);
for (int i=0;i<bytes.length;i++)
output.write(bytes[i]^0xff);
output.flush();
assertEquals(0x80|WebSocketConnectionD06.OP_CLOSE,input.read());
assertEquals(30,input.read());
int code=(0xff&input.read())*0x100+(0xff&input.read());
assertEquals(1004,code);
lookFor("Text message size > 15 chars",input);
}
@Test
public void testBinaryAggregate() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: aggregate\r\n" +
"Sec-WebSocket-Version: 6\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
socket.setSoTimeout(1000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection);
_serverWebSocket.getConnection().setMaxBinaryMessageSize(1024);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(WebSocketConnectionD06.OP_BINARY^0xff);
output.write(0x0a^0xff);
byte[] bytes="0123456789".getBytes(StringUtil.__ISO_8859_1);
for (int i=0;i<bytes.length;i++)
output.write(bytes[i]^0xff);
output.flush();
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0x80^0xff);
output.write(0x0a^0xff);
for (int i=0;i<bytes.length;i++)
output.write(bytes[i]^0xff);
output.flush();
assertEquals(0x85,input.read());
assertEquals(20,input.read());
lookFor("01234567890123456789",input);
}
@Test
public void testMaxBinarySize() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: other\r\n" +
"Sec-WebSocket-Version: 6\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
socket.setSoTimeout(100000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection);
_serverWebSocket.getConnection().setMaxBinaryMessageSize(15);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0x0f^0xff);
output.write(0x0a^0xff);
byte[] bytes="0123456789".getBytes(StringUtil.__ISO_8859_1);
for (int i=0;i<bytes.length;i++)
output.write(bytes[i]^0xff);
output.flush();
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0x80^0xff);
output.write(0x0a^0xff);
for (int i=0;i<bytes.length;i++)
output.write(bytes[i]^0xff);
output.flush();
assertEquals(0x80|WebSocketConnectionD06.OP_CLOSE,input.read());
assertEquals(19,input.read());
int code=(0xff&input.read())*0x100+(0xff&input.read());
assertEquals(1004,code);
lookFor("Message size > 15",input);
}
@Test
public void testMaxBinarySize2() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /chat HTTP/1.1\r\n"+
"Host: server.example.com\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: other\r\n" +
"Sec-WebSocket-Version: 6\r\n"+
"\r\n").getBytes("ISO-8859-1"));
output.flush();
socket.setSoTimeout(100000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 Switching Protocols\r\n",input);
skipTo("Sec-WebSocket-Accept: ",input);
lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.connection);
_serverWebSocket.getConnection().setMaxBinaryMessageSize(15);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0xff);
output.write(0x0f^0xff);
output.write(0x14^0xff);
byte[] bytes="01234567890123456789".getBytes(StringUtil.__ISO_8859_1);
for (int i=0;i<bytes.length;i++)
output.write(bytes[i]^0xff);
output.flush();
assertEquals(0x80|WebSocketConnectionD06.OP_CLOSE,input.read());
assertEquals(19,input.read());
int code=(0xff&input.read())*0x100+(0xff&input.read());
assertEquals(1004,code);
lookFor("Message size > 15",input);
}
@Test
public void testIdle() throws Exception
@ -269,7 +540,7 @@ public class WebSocketMessageD06Test
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.outbound);
assertNotNull(_serverWebSocket.connection);
assertEquals(0x84,input.read());
assertEquals(0x0f,input.read());
@ -277,8 +548,8 @@ public class WebSocketMessageD06Test
assertEquals((byte)0x81,(byte)input.read());
assertEquals(0x06,input.read());
assertEquals(1000/0xff,input.read());
assertEquals(1000%0xff,input.read());
assertEquals(1000/0x100,input.read());
assertEquals(1000%0x100,input.read());
lookFor("Idle",input);
// respond to close
@ -294,7 +565,7 @@ public class WebSocketMessageD06Test
assertTrue(_serverWebSocket.awaitDisconnected(5000));
try
{
_serverWebSocket.outbound.sendMessage("Don't send");
_serverWebSocket.connection.sendMessage("Don't send");
assertTrue(false);
}
catch(IOException e)
@ -332,7 +603,7 @@ public class WebSocketMessageD06Test
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.outbound);
assertNotNull(_serverWebSocket.connection);
assertEquals(0x84,input.read());
assertEquals(0x0f,input.read());
@ -344,7 +615,7 @@ public class WebSocketMessageD06Test
try
{
_serverWebSocket.outbound.sendMessage("Don't send");
_serverWebSocket.connection.sendMessage("Don't send");
assertTrue(false);
}
catch(IOException e)
@ -361,16 +632,23 @@ public class WebSocketMessageD06Test
ByteArrayEndPoint endp = new ByteArrayEndPoint(new byte[0],4096);
WebSocketGeneratorD06 gen = new WebSocketGeneratorD06(new WebSocketBuffers(8096),endp,null);
gen.addFrame((byte)0x4,message,1000);
byte[] data = message.getBytes(StringUtil.__UTF8);
gen.addFrame((byte)0x8,(byte)0x4,data,0,data.length,1000);
endp = new ByteArrayEndPoint(endp.getOut().asArray(),4096);
WebSocketParserD06 parser = new WebSocketParserD06(new WebSocketBuffers(8096),endp,new WebSocketParser.FrameHandler()
{
public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer)
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
received.set(buffer.toString());
}
public void close(int code,String message)
{
}
},false);
parser.parseNext();
@ -388,16 +666,21 @@ public class WebSocketMessageD06Test
WebSocketGeneratorD06.MaskGen maskGen = new WebSocketGeneratorD06.RandomMaskGen();
WebSocketGeneratorD06 gen = new WebSocketGeneratorD06(new WebSocketBuffers(8096),endp,maskGen);
gen.addFrame((byte)0x4,message,1000);
byte[] data = message.getBytes(StringUtil.__UTF8);
gen.addFrame((byte)0x8,(byte)0x4,data,0,data.length,1000);
endp = new ByteArrayEndPoint(endp.getOut().asArray(),4096);
WebSocketParserD06 parser = new WebSocketParserD06(new WebSocketBuffers(8096),endp,new WebSocketParser.FrameHandler()
{
public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer)
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
received.set(buffer.toString());
}
public void close(int code,String message)
{
}
},true);
parser.parseNext();
@ -455,22 +738,28 @@ public class WebSocketMessageD06Test
}
private static class TestWebSocket implements WebSocket
private static class TestWebSocket implements WebSocket.OnFrame, WebSocket.OnBinaryMessage, WebSocket.OnTextMessage
{
boolean onConnect=false;
boolean echo=true;
boolean aggregate=false;
private final CountDownLatch connected = new CountDownLatch(1);
private final CountDownLatch disconnected = new CountDownLatch(1);
private volatile Outbound outbound;
private volatile Connection connection;
public void onConnect(Outbound outbound)
public Connection getConnection()
{
this.outbound = outbound;
return connection;
}
public void onConnect(Connection connection)
{
this.connection = connection;
if (onConnect)
{
try
{
outbound.sendMessage("sent on connect");
connection.sendMessage("sent on connect");
}
catch(IOException e)
{
@ -490,54 +779,65 @@ public class WebSocketMessageD06Test
return disconnected.await(time, TimeUnit.MILLISECONDS);
}
public void onMessage(byte opcode, String data)
{
if (echo)
{
try
{
outbound.sendMessage(opcode,data);
}
catch(IOException e)
{
e.printStackTrace();
}
}
}
public void onMessage(byte opcode, byte[] data, int offset, int length)
{
if (echo)
{
try
{
outbound.sendMessage(opcode,data,offset,length);
}
catch(IOException e)
{
e.printStackTrace();
}
}
}
public void onDisconnect()
public void onDisconnect(int code,String message)
{
disconnected.countDown();
}
public void onFragment(boolean more, byte opcode, byte[] data, int offset, int length)
public boolean onFrame(byte flags, byte opcode, byte[] data, int offset, int length)
{
if (echo)
{
switch(opcode)
{
case WebSocketConnectionD06.OP_CLOSE:
case WebSocketConnectionD06.OP_PING:
case WebSocketConnectionD06.OP_PONG:
break;
default:
try
{
connection.sendFrame(flags,opcode,data,offset,length);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
return false;
}
public void onMessage(byte[] data, int offset, int length)
{
if (aggregate)
{
try
{
outbound.sendFragment(more,opcode,data,offset,length);
connection.sendMessage(data,offset,length);
}
catch(IOException e)
catch (IOException e)
{
e.printStackTrace();
}
}
}
public void onMessage(String data)
{
if (aggregate)
{
try
{
connection.sendMessage(data);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
}
}

View File

@ -147,7 +147,7 @@ public class WebSocketMessageW75Test
for (int i = 0; i < 64 * 1024 / text.length(); ++i)
message.append(text);
byte[] data = message.toString().getBytes("UTF-8");
_serverWebSocket.outbound.sendMessage(WebSocket.LENGTH_FRAME, data,0,data.length);
_serverWebSocket.outbound.sendMessage(data,0,data.length);
// Length of the message is 65536, so the length will be encoded as 0x84 0x80 0x00
int frame = input.read();
@ -170,9 +170,9 @@ public class WebSocketMessageW75Test
private static class TestWebSocket implements WebSocket
{
private final CountDownLatch latch = new CountDownLatch(1);
private volatile Outbound outbound;
private volatile Connection outbound;
public void onConnect(Outbound outbound)
public void onConnect(Connection outbound)
{
this.outbound = outbound;
latch.countDown();
@ -183,20 +183,9 @@ public class WebSocketMessageW75Test
return latch.await(time, TimeUnit.MILLISECONDS);
}
public void onMessage(byte frame, String data)
public void onDisconnect(int code,String data)
{
}
public void onMessage(byte frame, byte[] data, int offset, int length)
{
}
public void onDisconnect()
{
}
public void onFragment(boolean more, byte opcode, byte[] data, int offset, int length)
{
}
}
}

View File

@ -17,7 +17,7 @@ import org.junit.Before;
import org.junit.Test;
/**
* @version $Revision: 1441 $ $Date: 2010-04-02 12:28:17 +0200 (Fri, 02 Apr 2010) $
* @version $Revision$ $Date$
*/
public class WebSocketParserD00Test
{
@ -141,9 +141,13 @@ public class WebSocketParserD00Test
{
public List<String> _data = new ArrayList<String>();
public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer)
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
_data.add(buffer.toString(StringUtil.__UTF8));
}
public void close(int code,String message)
{
}
}
}

View File

@ -18,7 +18,7 @@ import org.junit.Before;
import org.junit.Test;
/**
* @version $Revision: 1441 $ $Date: 2010-04-02 12:28:17 +0200 (Fri, 02 Apr 2010) $
* @version $Revision$ $Date$
*/
public class WebSocketParserD01Test
{
@ -169,9 +169,9 @@ public class WebSocketParserD01Test
Utf8StringBuilder _utf8 = new Utf8StringBuilder();
public List<String> _data = new ArrayList<String>();
public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer)
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
if (more)
if ((flags&0x8)!=0)
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
else if (_utf8.length()==0)
_data.add(opcode,buffer.toString("utf-8"));
@ -182,5 +182,9 @@ public class WebSocketParserD01Test
_utf8.reset();
}
}
public void close(int code,String message)
{
}
}
}

View File

@ -13,7 +13,6 @@ import org.eclipse.jetty.io.BufferCache.CachedBuffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.junit.Before;
import org.junit.Test;
@ -94,6 +93,22 @@ public class WebSocketParserD06Test
assertEquals(HttpHeaderValues.UPGRADE_ORDINAL ,((CachedBuffer)HttpHeaderValues.CACHE.lookup("Upgrade")).getOrdinal());
}
@Test
public void testFlagsOppcode() throws Exception
{
_in.sendMask();
_in.put((byte)0xff);
_in.put((byte)0);
int filled =_parser.parseNext();
assertEquals(6,filled);
assertEquals(0xf,_handler._flags);
assertEquals(0xf,_handler._opcode);
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
@Test
public void testShortText() throws Exception
{
@ -107,6 +122,8 @@ public class WebSocketParserD06Test
assertEquals(17,filled);
assertEquals("Hello World",_handler._data.get(0));
assertEquals(0x8,_handler._flags);
assertEquals(0x4,_handler._opcode);
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
@ -126,6 +143,8 @@ public class WebSocketParserD06Test
assertEquals(bytes.length+6,filled);
assertEquals(string,_handler._data.get(0));
assertEquals(0x8,_handler._flags);
assertEquals(0x4,_handler._opcode);
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
@ -138,8 +157,8 @@ public class WebSocketParserD06Test
string = string+string;
string += ". The end.";
byte[] bytes = string.getBytes("UTF-8");
byte[] bytes = string.getBytes(StringUtil.__UTF8);
_in.sendMask();
_in.put((byte)0x84);
_in.put((byte)0x7E);
@ -151,6 +170,8 @@ public class WebSocketParserD06Test
assertEquals(bytes.length+8,filled);
assertEquals(string,_handler._data.get(0));
assertEquals(0x8,_handler._flags);
assertEquals(0x4,_handler._opcode);
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
@ -158,14 +179,12 @@ public class WebSocketParserD06Test
@Test
public void testLongText() throws Exception
{
WebSocketBuffers buffers = new WebSocketBuffers(0x20000);
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
WebSocketParser parser=new WebSocketParserD06(buffers, endPoint,_handler,false);
ByteArrayBuffer in = new ByteArrayBuffer(0x20000);
endPoint.setIn(in);
String string = "Hell\uFF4f Big W\uFF4Frld ";
for (int i=0;i<12;i++)
string = string+string;
@ -221,15 +240,59 @@ public class WebSocketParserD06Test
assertTrue(_parser.getBuffer()==null);
}
@Test
public void testFrameTooLarge() throws Exception
{
_in.sendMask();
_in.put((byte)0x84);
_in.put((byte)0x7E);
_in.put((byte)(2048>>8));
_in.put((byte)(2048&0xff));
int filled =_parser.parseNext();
assertEquals(8,filled);
assertEquals(WebSocketConnectionD06.CLOSE_LARGE,_handler._code);
for (int i=0;i<2048;i++)
_in.put((byte)'a');
filled =_parser.parseNext();
assertEquals(2048,filled);
assertEquals(0,_handler._data.size());
assertEquals(0,_handler._utf8.length());
_handler._code=0;
_handler._message=null;
_in.sendMask();
_in.put((byte)0x84);
_in.put((byte)0x7E);
_in.put((byte)(1024>>8));
_in.put((byte)(1024&0xff));
for (int i=0;i<1024;i++)
_in.put((byte)'a');
filled =_parser.parseNext();
assertEquals(1024+8,filled);
assertEquals(1,_handler._data.size());
assertEquals(1024,_handler._data.get(0).length());
}
private class Handler implements WebSocketParser.FrameHandler
{
Utf8StringBuilder _utf8 = new Utf8StringBuilder();
public List<String> _data = new ArrayList<String>();
private byte _flags;
private byte _opcode;
int _code;
String _message;
public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer)
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
if (more)
_flags=flags;
_opcode=opcode;
if ((flags&0x8)==0)
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
else if (_utf8.length()==0)
_data.add(buffer.toString("utf-8"));
@ -240,5 +303,11 @@ public class WebSocketParserD06Test
_utf8.reset();
}
}
public void close(int code,String message)
{
_code=code;
_message=message;
}
}
}

View File

@ -1,200 +0,0 @@
package org.eclipse.jetty.websocket;
import java.io.BufferedReader;
import java.io.BufferedWriter;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStreamWriter;
import java.net.Socket;
import java.security.SecureRandom;
import java.util.Random;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import junit.framework.Assert;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.bio.SocketEndPoint;
import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.TypeUtil;
/**
* @version $Revision$ $Date$
*/
public class WebSocketPingD06
{
private final static Random __random = new SecureRandom();
private final String _host;
private final int _port;
private final int _size=64;
private final Socket _socket;
private final BufferedWriter _output;
private final BufferedReader _input;
private final SocketEndPoint _endp;
private final WebSocketGeneratorD06 _generator;
private final WebSocketParserD06 _parser;
private int _sent;
private int _received;
private long _totalTime;
private long _minDuration=Long.MAX_VALUE;
private long _maxDuration=Long.MIN_VALUE;
private long _start;
private BlockingQueue<Long> _starts = new LinkedBlockingQueue<Long>();
private BlockingQueue<String> _pending = new LinkedBlockingQueue<String>();
private final WebSocketParser.FrameHandler _handler = new WebSocketParser.FrameHandler()
{
public synchronized void onFrame(boolean more, byte flags, byte opcode, Buffer buffer)
{
long start=_starts.poll();
String data=_pending.poll();
while (!data.equals(TypeUtil.toHexString(buffer.asArray())) && !_starts.isEmpty() && !_pending.isEmpty())
{
// Missed response
start=_starts.poll();
data=_pending.poll();
}
_received++;
long duration = System.nanoTime()-start;
if (duration>_maxDuration)
_maxDuration=duration;
if (duration<_minDuration)
_minDuration=duration;
_totalTime+=duration;
System.out.print(buffer.length()+" bytes from "+_host+": req="+_received+" time=");
System.out.printf("%.1fms\n",((double)duration/1000000.0));
}
};
public WebSocketPingD06(String host, int port,int timeoutMS) throws IOException
{
_host=host;
_port=port;
_socket = new Socket(host, port);
_socket.setSoTimeout(timeoutMS);
_output = new BufferedWriter(new OutputStreamWriter(_socket.getOutputStream(), "ISO-8859-1"));
_input = new BufferedReader(new InputStreamReader(_socket.getInputStream(), "ISO-8859-1"));
_endp=new SocketEndPoint(_socket);
_generator = new WebSocketGeneratorD06(new WebSocketBuffers(32*1024),_endp,new WebSocketGeneratorD06.FixedMaskGen());
_parser = new WebSocketParserD06(new WebSocketBuffers(32*1024),_endp,_handler,false);
}
private void open() throws IOException
{
System.out.println("Jetty WebSocket PING "+_host+":"+_port+
" ("+_socket.getRemoteSocketAddress()+") " +_size+" bytes of data.");
byte[] key = new byte[16];
__random.nextBytes(key);
_output.write("GET /chat HTTP/1.1\r\n"+
"Host: "+_host+":"+_port+"\r\n"+
"Upgrade: websocket\r\n"+
"Connection: Upgrade\r\n"+
"Sec-WebSocket-Key: "+new String(B64Code.encode(key))+"\r\n"+
"Sec-WebSocket-Origin: http://example.com\r\n"+
"Sec-WebSocket-Protocol: lws-mirror-protocol\r\n" +
"Sec-WebSocket-Version: 6\r\n"+
"\r\n");
_output.flush();
String responseLine = _input.readLine();
if(!responseLine.startsWith("HTTP/1.1 101 Switching Protocols"))
throw new IOException(responseLine);
// Read until we find Response key
String line;
boolean accepted=false;
String protocol="";
while ((line = _input.readLine()) != null)
{
if (line.length() == 0)
break;
if (line.startsWith("Sec-WebSocket-Accept:"))
{
String accept=line.substring(21).trim();
accepted=accept.equals(WebSocketConnectionD06.hashKey(new String(B64Code.encode(key))));
}
else if (line.startsWith("Sec-WebSocket-Protocol:"))
{
protocol=line.substring(24).trim();
}
}
if (!accepted)
throw new IOException("Bad Sec-WebSocket-Accept");
System.out.println("handshake OK for protocol "+protocol);
new Thread()
{
public void run()
{
while (_endp.isOpen())
_parser.parseNext();
}
}.start();
}
public void run()
{
_start=System.currentTimeMillis();
for (int i=0;i<10;i++)
{
try
{
byte data[] = new byte[_size];
__random.nextBytes(data);
_starts.add(System.nanoTime());
_pending.add(TypeUtil.toHexString(data));
_sent++;
_generator.addFrame(WebSocket.OP_PING,data,_socket.getSoTimeout());
_generator.flush(_socket.getSoTimeout());
Thread.sleep(1000);
}
catch (Exception x)
{
throw new RuntimeException(x);
}
}
}
public void dump() throws IOException
{
_socket.close();
long duration=System.currentTimeMillis()-_start;
System.out.println("--- "+_host+" websocket ping statistics using 1 connection ---");
System.out.println(_sent+" packets transmitted, "+_received+" received, "+
String.format("%d",100*(_sent-_received)/_sent)+"% loss, time "+duration+"ms");
System.out.printf("rtt min/ave/max = %.3f/%.3f/%.3f ms\n",_minDuration/1000000.0,_totalTime/_received/1000000.0,_maxDuration/1000000.0);
}
public static void main(String[] args)
throws Exception
{
WebSocketPingD06 ping = new WebSocketPingD06("localhost",8080,10000);
try
{
ping.open();
ping.run();
}
finally
{
ping.dump();
}
}
}

View File

@ -1,102 +0,0 @@
package org.eclipse.jetty.websocket;
import java.io.IOException;
import java.util.concurrent.ConcurrentLinkedQueue;
import javax.servlet.http.HttpServletRequest;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
public class WebSocketTestServer extends Server
{
TestWebSocket _websocket;
SelectChannelConnector _connector;
WebSocketHandler _handler;
ConcurrentLinkedQueue<TestWebSocket> _webSockets = new ConcurrentLinkedQueue<TestWebSocket>();
public WebSocketTestServer()
{
_connector = new SelectChannelConnector();
_connector.setPort(8080);
addConnector(_connector);
_handler = new WebSocketHandler()
{
@Override
protected WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
_websocket = new TestWebSocket();
return _websocket;
}
};
setHandler(_handler);
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class TestWebSocket implements WebSocket
{
Outbound _outbound;
public void onConnect(Outbound outbound)
{
System.err.println("onConnect");
_outbound = outbound;
_webSockets.add(this);
}
public void onMessage(byte frame, byte[] data, int offset, int length)
{
System.err.println("onMessage " + TypeUtil.toHexString(data,offset,length));
}
public void onMessage(final byte frame, final String data)
{
System.err.println("onMessage " + data);
for (TestWebSocket ws : _webSockets)
{
if (ws != this)
{
try
{
if (ws._outbound.isOpen())
ws._outbound.sendMessage(frame, data);
}
catch (IOException e)
{
e.printStackTrace();
}
}
}
}
public void onDisconnect()
{
_webSockets.remove(this);
}
public void onFragment(boolean more, byte opcode, byte[] data, int offset, int length)
{
}
}
public static void main(String[] args)
{
try
{
WebSocketTestServer server = new WebSocketTestServer();
server.start();
server.join();
}
catch (Exception e)
{
Log.warn(e);
}
}
}

View File

@ -32,14 +32,14 @@ public class WebSocketChatServlet extends WebSocketServlet
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class ChatWebSocket implements WebSocket
class ChatWebSocket implements WebSocket.OnTextMessage
{
Outbound _outbound;
Connection _connection;
public void onConnect(Outbound outbound)
public void onConnect(Connection connection)
{
// Log.info(this+" onConnect");
_outbound=outbound;
_connection=connection;
_members.add(this);
}
@ -48,10 +48,10 @@ public class WebSocketChatServlet extends WebSocketServlet
// Log.info(this+" onMessage: "+TypeUtil.toHexString(data,offset,length));
}
public void onMessage(byte frame, String data)
public void onMessage(String data)
{
if (data.indexOf("disconnect")>=0)
_outbound.disconnect();
_connection.disconnect(0,"");
else
{
// Log.info(this+" onMessage: "+data);
@ -59,7 +59,7 @@ public class WebSocketChatServlet extends WebSocketServlet
{
try
{
member._outbound.sendMessage(frame,data);
member._connection.sendMessage(data);
}
catch(IOException e)
{
@ -69,14 +69,11 @@ public class WebSocketChatServlet extends WebSocketServlet
}
}
public void onDisconnect()
public void onDisconnect(int code, String message)
{
// Log.info(this+" onDisconnect");
_members.remove(this);
}
public void onFragment(boolean more, byte opcode, byte[] data, int offset, int length)
{
}
}
}