diff --git a/VERSION.txt b/VERSION.txt index 0330118ed37..522797db1c0 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -3,6 +3,9 @@ jetty-7.3.2-SNAPSHOT + Ensure generated fragment names are unique + 339187 In the OSGi manifest of the jetty-all-server aggregate, mark javax.annotation as optional +jetty-7.3.2-SNAPSHOT + + 337685 Update websocket API in preparation for draft -07 + jetty-7.3.1.v20110307 7 March 2011 + 316382 Support a more strict SSL option with certificates + 333481 Handle UCS-4 codepoints in decode and encode diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/WebSocketUpgradeTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/WebSocketUpgradeTest.java index e615d3aee4a..6e29d3ec156 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/WebSocketUpgradeTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/WebSocketUpgradeTest.java @@ -70,34 +70,26 @@ public class WebSocketUpgradeTest extends TestCase public void testGetWithContentExchange() throws Exception { - final WebSocket clientWS = new WebSocket() + final WebSocket clientWS = new WebSocket.OnTextMessage() { - Outbound _outbound; - - public void onConnect(Outbound outbound) + Connection _connection; + + public void onDisconnect(int closeCode, String message) { - _outbound=outbound; + } + + public void onConnect(Connection connection) + { + _connection=connection; _results.add("clientWS.onConnect"); - _results.add(_outbound); + _results.add(_connection); } - - public void onDisconnect() - { - } - - public void onMessage(byte frame, String data) + + public void onMessage(String data) { _results.add("clientWS.onMessage"); _results.add(data); } - - public void onMessage(byte frame, byte[] data, int offset, int length) - { - } - - public void onFragment(boolean more, byte opcode, byte[] data, int offset, int length) - { - } }; @@ -217,29 +209,25 @@ public class WebSocketUpgradeTest extends TestCase /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ - class TestWebSocket implements WebSocket + class TestWebSocket implements WebSocket.OnTextMessage { - Outbound _outbound; + Connection _connection; - public void onConnect(Outbound outbound) + public void onConnect(Connection connection) { - _outbound=outbound; + _connection=connection; _webSockets.add(this); _results.add("serverWS.onConnect"); _results.add(this); } - public void onMessage(byte frame, byte[] data,int offset, int length) - { - } - - public void onMessage(final byte frame, final String data) + public void onMessage(final String data) { _results.add("serverWS.onMessage"); _results.add(data); } - public void onDisconnect() + public void onDisconnect(int code, String message) { _results.add("onDisconnect"); _webSockets.remove(this); @@ -247,11 +235,7 @@ public class WebSocketUpgradeTest extends TestCase public void sendMessage(String msg) throws IOException { - _outbound.sendMessage(msg); - } - - public void onFragment(boolean more, byte opcode, byte[] data, int offset, int length) - { + _connection.sendMessage(msg); } } } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java index 1ba36a69d89..b5cf61c1ed5 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java @@ -15,6 +15,7 @@ package org.eclipse.jetty.io.nio; import java.io.IOException; import java.nio.channels.CancelledKeyException; +import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectableChannel; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; @@ -606,6 +607,13 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa }); } } + catch (ClosedSelectorException e) + { + if (isRunning()) + Log.warn(e); + else + Log.ignore(e); + } catch (CancelledKeyException e) { Log.ignore(e); diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/TypeUtil.java b/jetty-util/src/main/java/org/eclipse/jetty/util/TypeUtil.java index b578b2ef4b3..ba29bb23811 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/TypeUtil.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/TypeUtil.java @@ -383,6 +383,12 @@ public class TypeUtil } } + /* ------------------------------------------------------------ */ + public static String toHexString(byte b) + { + return toHexString(new byte[]{b}, 0, 1); + } + /* ------------------------------------------------------------ */ public static String toHexString(byte[] b) { diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/Utf8StringBuilder.java b/jetty-util/src/main/java/org/eclipse/jetty/util/Utf8StringBuilder.java index f450ca52d24..da9086420e6 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/Utf8StringBuilder.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/Utf8StringBuilder.java @@ -48,6 +48,25 @@ public class Utf8StringBuilder for (int i=offset; imaxChars) + return false; + append(b[i]); + } + return true; + } public void append(byte b) { diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/FrameHandlerD0.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/FrameHandlerD0.java deleted file mode 100644 index 62490c81a72..00000000000 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/FrameHandlerD0.java +++ /dev/null @@ -1,55 +0,0 @@ -// ======================================================================== -// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== - -package org.eclipse.jetty.websocket; - -import org.eclipse.jetty.io.Buffer; -import org.eclipse.jetty.util.Utf8StringBuilder; -import org.eclipse.jetty.util.log.Log; - -final class FrameHandlerD0 implements WebSocketParser.FrameHandler -{ - final WebSocket _websocket; - final Utf8StringBuilder _utf8 = new Utf8StringBuilder(); - - FrameHandlerD0(WebSocket websocket) - { - _websocket=websocket; - } - - public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer) - { - assert more==false; - try - { - byte[] array=buffer.array(); - - if (opcode==0) - { - _websocket.onMessage(opcode,buffer.toString("utf-8")); - } - else - { - _websocket.onMessage(opcode,array,buffer.getIndex(),buffer.length()); - } - } - catch(ThreadDeath th) - { - throw th; - } - catch(Throwable th) - { - Log.warn(th); - } - } -} \ No newline at end of file diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/FrameHandlerD1.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/FrameHandlerD1.java deleted file mode 100644 index d1895917b86..00000000000 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/FrameHandlerD1.java +++ /dev/null @@ -1,94 +0,0 @@ -// ======================================================================== -// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== - -package org.eclipse.jetty.websocket; - -import org.eclipse.jetty.io.Buffer; -import org.eclipse.jetty.util.Utf8StringBuilder; -import org.eclipse.jetty.util.log.Log; - -final class FrameHandlerD1 implements WebSocketParser.FrameHandler -{ - public final static byte PING=1; - public final static byte PONG=1; - - final WebSocketConnectionD00 _connection; - final WebSocket _websocket; - final Utf8StringBuilder _utf8 = new Utf8StringBuilder(); - boolean _fragmented=false; - - FrameHandlerD1(WebSocketConnectionD00 connection, WebSocket websocket) - { - _connection=connection; - _websocket=websocket; - } - - public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer) - { - try - { - byte[] array=buffer.array(); - - if (opcode==0) - { - if (more) - { - _utf8.append(buffer.array(),buffer.getIndex(),buffer.length()); - _fragmented=true; - } - else if (_fragmented) - { - _utf8.append(buffer.array(),buffer.getIndex(),buffer.length()); - _websocket.onMessage(opcode,_utf8.toString()); - _utf8.reset(); - _fragmented=false; - } - else - { - _websocket.onMessage(opcode,buffer.toString("utf-8")); - } - } - else if (opcode==PING) - { - _connection.sendMessage(PONG,buffer.array(),buffer.getIndex(),buffer.length()); - } - else if (opcode==PONG) - { - - } - else - { - if (more) - { - _websocket.onFragment(true,opcode,array,buffer.getIndex(),buffer.length()); - } - else if (_fragmented) - { - _websocket.onFragment(false,opcode,array,buffer.getIndex(),buffer.length()); - } - else - { - _websocket.onMessage(opcode,array,buffer.getIndex(),buffer.length()); - } - } - } - catch(ThreadDeath th) - { - throw th; - } - catch(Throwable th) - { - Log.warn(th); - } - } -} \ No newline at end of file diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/TestClient.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/TestClient.java new file mode 100644 index 00000000000..46eb7e5417e --- /dev/null +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/TestClient.java @@ -0,0 +1,302 @@ +package org.eclipse.jetty.websocket; + +import java.io.BufferedReader; +import java.io.BufferedWriter; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.OutputStreamWriter; +import java.net.Socket; +import java.security.SecureRandom; +import java.util.Arrays; +import java.util.Random; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; + +import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.io.bio.SocketEndPoint; +import org.eclipse.jetty.util.B64Code; +import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.TypeUtil; +import org.eclipse.jetty.util.log.Log; + +/** + * @version $Revision$ $Date$ + */ +public class TestClient +{ + private final static Random __random = new SecureRandom(); + private final String _host; + private final int _port; + private final String _protocol; + private int _size=64; + private final Socket _socket; + private final BufferedWriter _output; + private final BufferedReader _input; + private final SocketEndPoint _endp; + private final WebSocketGeneratorD06 _generator; + private final WebSocketParserD06 _parser; + private int _sent; + private int _received; + private long _totalTime; + private long _minDuration=Long.MAX_VALUE; + private long _maxDuration=Long.MIN_VALUE; + private long _start; + private BlockingQueue _starts = new LinkedBlockingQueue(); + private BlockingQueue _pending = new LinkedBlockingQueue(); + + private final WebSocketParser.FrameHandler _handler = new WebSocketParser.FrameHandler() + { + public synchronized void onFrame(byte flags, byte opcode, Buffer buffer) + { + try + { + if (opcode == WebSocketConnectionD06.OP_CLOSE) + { + byte[] data=buffer.asArray(); + System.err.println("CLOSED: "+((0xff&data[0])*0x100+(0xff&data[1]))+" "+new String(data,2,data.length-2,StringUtil.__UTF8)); + _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_CLOSE,data,0,data.length,_socket.getSoTimeout()); + _generator.flush(_socket.getSoTimeout()); + _socket.shutdownOutput(); + _socket.close(); + return; + } + + Long start=_starts.take(); + String data=_pending.take(); + + while (!data.equals(TypeUtil.toHexString(buffer.asArray())) && !_starts.isEmpty() && !_pending.isEmpty()) + { + // Missed response + start=_starts.take(); + data=_pending.take(); + } + + _received++; + + long duration = System.nanoTime()-start.longValue(); + if (duration>_maxDuration) + _maxDuration=duration; + if (duration<_minDuration) + _minDuration=duration; + _totalTime+=duration; + System.out.printf("%d bytes from %s: req=%d time=%.1fms opcode=0x%s\n",buffer.length(),_host,_received,((double)duration/1000000.0),TypeUtil.toHexString(opcode)); + } + catch(Exception e) + { + e.printStackTrace(); + } + + } + + public void close(int code,String message) + { + } + }; + + + public TestClient(String host, int port,String protocol, int timeoutMS) throws IOException + { + _host=host; + _port=port; + _protocol=protocol; + _socket = new Socket(host, port); + _socket.setSoTimeout(timeoutMS); + _output = new BufferedWriter(new OutputStreamWriter(_socket.getOutputStream(), "ISO-8859-1")); + _input = new BufferedReader(new InputStreamReader(_socket.getInputStream(), "ISO-8859-1")); + + _endp=new SocketEndPoint(_socket); + _generator = new WebSocketGeneratorD06(new WebSocketBuffers(32*1024),_endp,new WebSocketGeneratorD06.FixedMaskGen()); + _parser = new WebSocketParserD06(new WebSocketBuffers(32*1024),_endp,_handler,false); + } + + public int getSize() + { + return _size; + } + + public void setSize(int size) + { + _size = size; + } + + private void open() throws IOException + { + System.out.println("Jetty WebSocket PING "+_host+":"+_port+ + " ("+_socket.getRemoteSocketAddress()+") " +_size+" bytes of data."); + byte[] key = new byte[16]; + __random.nextBytes(key); + + + _output.write("GET /chat HTTP/1.1\r\n"+ + "Host: "+_host+":"+_port+"\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: "+new String(B64Code.encode(key))+"\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: "+_protocol+"\r\n" + + "Sec-WebSocket-Version: 6\r\n"+ + "\r\n"); + _output.flush(); + + String responseLine = _input.readLine(); + if(!responseLine.startsWith("HTTP/1.1 101 Switching Protocols")) + throw new IOException(responseLine); + // Read until we find Response key + String line; + boolean accepted=false; + String protocol=""; + while ((line = _input.readLine()) != null) + { + if (line.length() == 0) + break; + if (line.startsWith("Sec-WebSocket-Accept:")) + { + String accept=line.substring(21).trim(); + accepted=accept.equals(WebSocketConnectionD06.hashKey(new String(B64Code.encode(key)))); + } + else if (line.startsWith("Sec-WebSocket-Protocol:")) + { + protocol=line.substring(24).trim(); + } + } + + if (!accepted) + throw new IOException("Bad Sec-WebSocket-Accept"); + System.out.println("handshake OK for protocol '"+protocol+"'"); + + new Thread() + { + public void run() + { + while (_endp.isOpen()) + _parser.parseNext(); + } + }.start(); + } + + public void ping(int count,byte opcode,int fragment) + { + _start=System.currentTimeMillis(); + for (int i=0;i0&& len>fragment) + len=fragment; + while(offlen) + len=data.length-off; + } + _generator.flush(_socket.getSoTimeout()); + + Thread.sleep(1000); + + } + catch (Exception x) + { + throw new RuntimeException(x); + } + } + } + + + public void dump() throws IOException + { + _socket.close(); + long duration=System.currentTimeMillis()-_start; + System.out.println("--- "+_host+" websocket ping statistics using 1 connection ---"); + System.out.println(_sent+" packets transmitted, "+_received+" received, "+ + (_sent>0?String.format("%d",100*(_sent-_received)/_sent)+"% loss, ":"")+ + "time "+duration+"ms"); + System.out.printf("rtt min/ave/max = %.3f/%.3f/%.3f ms\n",_minDuration/1000000.0,_received==0?0.0:(_totalTime/_received/1000000.0),_maxDuration/1000000.0); + } + + + private static void usage(String[] args) + { + System.err.println("ERROR: "+Arrays.asList(args)); + System.err.println("USAGE: java -cp CLASSPATH "+TestClient.class+" [ OPTIONS ]"); + System.err.println(" -h|--host HOST (default localhost)"); + System.err.println(" -p|--port PORT (default 8080)"); + System.err.println(" -v|--verbose"); + System.err.println(" -c|--count n (default 10)"); + System.err.println(" -s|--size n (default 64)"); + System.err.println(" -f|--fragment n (default 4000) "); + System.err.println(" -P|--protocol echo|echo-assemble|echo-fragment"); + System.exit(1); + } + + public static void main(String[] args) + { + try + { + String host="localhost"; + int port=8080; + boolean verbose=false; + String protocol=null; + int count=10; + int size=64; + int fragment=4000; + + for (int i=0;i _webSockets = new ConcurrentLinkedQueue(); + + public TestServer(int port) + { + _connector = new SelectChannelConnector(); + _connector.setPort(port); + + addConnector(_connector); + _handler = new WebSocketHandler() + { + @Override + protected WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) + { + if ("org.ietf.websocket.test-echo".equals(protocol) || "echo".equals(protocol) || "lws-mirror-protocol".equals(protocol)) + { + _websocket = new TestEchoWebSocket(); + } + else if ("org.ietf.websocket.test-echo-broadcast".equals(protocol)) + { + _websocket = new TestEchoBroadcastWebSocket(); + + } + else if ("org.ietf.websocket.test-echo-assemble".equals(protocol)) + { + + } + else if ("org.ietf.websocket.test-echo-fragment".equals(protocol)) + { + + } + else if ("org.ietf.websocket.test-consume".equals(protocol)) + { + + } + else if ("org.ietf.websocket.test-produce".equals(protocol)) + { + + } + else if (protocol==null) + { + _websocket = new TestWebSocket(); + } + return _websocket; + } + }; + + setHandler(_handler); + } + + + public boolean isVerbose() + { + return _verbose; + } + + public void setVerbose(boolean verbose) + { + _verbose = verbose; + } + + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + class TestWebSocket implements WebSocket, WebSocket.OnFrame, WebSocket.OnBinaryMessage, WebSocket.OnTextMessage, WebSocket.OnControl + { + protected Connection _connection; + + public Connection getOutbound() + { + return _connection; + } + + public void onConnect(Connection connection) + { + _connection = connection; + _webSockets.add(this); + } + + public void onDisconnect(int code,String message) + { + _webSockets.remove(this); + } + + public boolean onFrame(byte flags, byte opcode, byte[] data, int offset, int length) + { + if (_verbose) + System.err.printf("%s#onFrame %s|%s %s\n",this.getClass().getSimpleName(),TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),TypeUtil.toHexString(data,offset,length)); + return false; + } + + public boolean onControl(byte controlCode, byte[] data, int offset, int length) + { + if (_verbose) + System.err.printf("%s#onControl %s %s\n",this.getClass().getSimpleName(),TypeUtil.toHexString(controlCode),TypeUtil.toHexString(data,offset,length)); + return false; + } + + public void onMessage(String data) + { + if (_verbose) + System.err.printf("%s#onMessage %s\n",this.getClass().getSimpleName(),data); + } + + public void onMessage(byte[] data, int offset, int length) + { + if (_verbose) + System.err.printf("%s#onMessage %s\n",this.getClass().getSimpleName(),TypeUtil.toHexString(data,offset,length)); + } + } + + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + class TestEchoWebSocket extends TestWebSocket + { + @Override + public void onConnect(Connection connection) + { + super.onConnect(connection); + connection.setMaxTextMessageSize(-1); + connection.setMaxBinaryMessageSize(-1); + } + + @Override + public boolean onFrame(byte flags, byte opcode, byte[] data, int offset, int length) + { + super.onFrame(flags,opcode,data,offset,length); + try + { + switch(opcode) + { + case WebSocketConnectionD06.OP_CLOSE: + case WebSocketConnectionD06.OP_PING: + case WebSocketConnectionD06.OP_PONG: + break; + default: + getOutbound().sendFrame(flags,opcode,data,offset,length); + } + } + catch (IOException e) + { + e.printStackTrace(); + } + + return false; + } + } + + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + class TestEchoBroadcastWebSocket extends TestWebSocket + { + @Override + public void onMessage(byte[] data, int offset, int length) + { + super.onMessage(data,offset,length); + for (TestWebSocket ws : _webSockets) + { + try + { + ws.getOutbound().sendMessage(data,offset,length); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + } + + @Override + public void onMessage(final String data) + { + super.onMessage(data); + for (TestWebSocket ws : _webSockets) + { + try + { + ws.getOutbound().sendMessage(data); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + } + } + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + class TestEchoAssembleWebSocket extends TestWebSocket + { + + @Override + public void onConnect(Connection connection) + { + super.onConnect(connection); + connection.setMaxTextMessageSize(64*1024); + connection.setMaxBinaryMessageSize(64*1024); + } + + @Override + public void onMessage(byte[] data, int offset, int length) + { + super.onMessage(data,offset,length); + try + { + getOutbound().sendMessage(data,offset,length); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + + @Override + public void onMessage(final String data) + { + super.onMessage(data); + try + { + getOutbound().sendMessage(data); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + } + + private static void usage() + { + System.err.println("java -cp CLASSPATH "+TestServer.class+" [ OPTIONS ]"); + System.err.println(" -p|--port PORT "); + System.err.println(" -v|--verbose "); + System.exit(1); + } + + public static void main(String[] args) + { + try + { + int port=8080; + boolean verbose=false; + + for (int i=0;i=0 max size of text frame aggregation buffer in characters + */ + int getMaxTextMessageSize(); + + /** + * Size in bytes of the maximum binary message to be received + * @return <0 no aggregation of binary frames, >=0 size of binary frame aggregation buffer + */ + int getMaxBinaryMessageSize(); } + } diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD00.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD00.java index 392ba373a62..b6cefe170ff 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD00.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD00.java @@ -28,10 +28,16 @@ import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.nio.IndirectNIOBuffer; import org.eclipse.jetty.io.nio.SelectChannelEndPoint; +import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.util.log.Log; -public class WebSocketConnectionD00 extends AbstractConnection implements WebSocketConnection, WebSocket.Outbound +public class WebSocketConnectionD00 extends AbstractConnection implements WebSocketConnection, WebSocket.Connection { + public final static byte LENGTH_FRAME=(byte)0x80; + public final static byte SENTINEL_FRAME=(byte)0x00; + + final IdleCheck _idle; final WebSocketParser _parser; final WebSocketGenerator _generator; @@ -63,7 +69,7 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc { case 1: _generator = new WebSocketGeneratorD01(buffers, _endp); - _parser = new WebSocketParserD01(buffers, endpoint, new FrameHandlerD1(this,_websocket)); + _parser = new WebSocketParserD01(buffers, endpoint, new FrameHandlerD1(_websocket)); break; default: _generator = new WebSocketGeneratorD00(buffers, _endp); @@ -215,59 +221,60 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc public void closed() { - _websocket.onDisconnect(); + _websocket.onDisconnect(0,""); } /* ------------------------------------------------------------ */ /** - * {@inheritDoc} */ public void sendMessage(String content) throws IOException { - sendMessage(WebSocket.SENTINEL_FRAME,content); - } - - /* ------------------------------------------------------------ */ - /** - * {@inheritDoc} - */ - public void sendMessage(byte frame, String content) throws IOException - { - _generator.addFrame(frame,content,_endp.getMaxIdleTime()); + byte[] data = content.getBytes(StringUtil.__UTF8); + _generator.addFrame((byte)0,SENTINEL_FRAME,data,0,data.length,_endp.getMaxIdleTime()); _generator.flush(); checkWriteable(); _idle.access(_endp); } /* ------------------------------------------------------------ */ - /** - * {@inheritDoc} - */ - public void sendMessage(byte opcode, byte[] content, int offset, int length) throws IOException + public void sendMessage(byte[] data, int offset, int length) throws IOException { - _generator.addFrame(opcode,content,offset,length,_endp.getMaxIdleTime()); + _generator.addFrame((byte)0,LENGTH_FRAME,data,offset,length,_endp.getMaxIdleTime()); _generator.flush(); checkWriteable(); _idle.access(_endp); } + /* ------------------------------------------------------------ */ + public boolean isMore(byte flags) + { + return (flags&0x8) != 0; + } + /* ------------------------------------------------------------ */ /** * {@inheritDoc} */ - public void sendFragment(boolean more,byte opcode, byte[] content, int offset, int length) throws IOException + public void sendControl(byte code, byte[] content, int offset, int length) throws IOException { - _generator.addFragment(!more,opcode,content,offset,length,_endp.getMaxIdleTime()); + } + + /* ------------------------------------------------------------ */ + public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException + { + _generator.addFrame((byte)0,opcode,content,offset,length,_endp.getMaxIdleTime()); _generator.flush(); checkWriteable(); _idle.access(_endp); } + /* ------------------------------------------------------------ */ public void disconnect(int code, String message) { throw new UnsupportedOperationException(); } - + + /* ------------------------------------------------------------ */ public void disconnect() { try @@ -379,4 +386,148 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc } } + public void setMaxTextMessageSize(int size) + { + } + + public void setMaxBinaryMessageSize(int size) + { + } + + public int getMaxTextMessageSize() + { + return -1; + } + + public int getMaxBinaryMessageSize() + { + return -1; + } + + class FrameHandlerD0 implements WebSocketParser.FrameHandler + { + final WebSocket _websocket; + final Utf8StringBuilder _utf8 = new Utf8StringBuilder(); + + FrameHandlerD0(WebSocket websocket) + { + _websocket=websocket; + } + + public void onFrame(byte flags, byte opcode, Buffer buffer) + { + try + { + byte[] array=buffer.array(); + + if (opcode==0) + { + if (_websocket instanceof WebSocket.OnTextMessage) + ((WebSocket.OnTextMessage)_websocket).onMessage(buffer.toString(StringUtil.__UTF8)); + } + else + { + if (_websocket instanceof WebSocket.OnBinaryMessage) + ((WebSocket.OnBinaryMessage)_websocket).onMessage(array,buffer.getIndex(),buffer.length()); + } + } + catch(ThreadDeath th) + { + throw th; + } + catch(Throwable th) + { + Log.warn(th); + } + } + + public void close(int code,String message) + { + disconnect(code,message); + } + } + + class FrameHandlerD1 implements WebSocketParser.FrameHandler + { + public final static byte PING=1; + public final static byte PONG=1; + + final WebSocket _websocket; + final Utf8StringBuilder _utf8 = new Utf8StringBuilder(); + boolean _fragmented=false; + + FrameHandlerD1(WebSocket websocket) + { + _websocket=websocket; + } + + public void onFrame(byte flags, byte opcode, Buffer buffer) + { + try + { + byte[] array=buffer.array(); + + if (opcode==0) + { + if (isMore(flags)) + { + _utf8.append(buffer.array(),buffer.getIndex(),buffer.length()); + _fragmented=true; + } + else if (_fragmented) + { + _utf8.append(buffer.array(),buffer.getIndex(),buffer.length()); + if (_websocket instanceof WebSocket.OnTextMessage) + ((WebSocket.OnTextMessage)_websocket).onMessage(_utf8.toString()); + _utf8.reset(); + _fragmented=false; + } + else + { + if (_websocket instanceof WebSocket.OnTextMessage) + ((WebSocket.OnTextMessage)_websocket).onMessage(buffer.toString(StringUtil.__UTF8)); + } + } + else if (opcode==PING) + { + sendFrame(flags,PONG,buffer.array(),buffer.getIndex(),buffer.length()); + } + else if (opcode==PONG) + { + + } + else + { + if (isMore(flags)) + { + if (_websocket instanceof WebSocket.OnFrame) + ((WebSocket.OnFrame)_websocket).onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()); + } + else if (_fragmented) + { + if (_websocket instanceof WebSocket.OnFrame) + ((WebSocket.OnFrame)_websocket).onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()); + } + else + { + if (_websocket instanceof WebSocket.OnBinaryMessage) + ((WebSocket.OnBinaryMessage)_websocket).onMessage(array,buffer.getIndex(),buffer.length()); + } + } + } + catch(ThreadDeath th) + { + throw th; + } + catch(Throwable th) + { + Log.warn(th); + } + } + + public void close(int code,String message) + { + disconnect(code,message); + } + } } diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD06.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD06.java index 954b4166a48..39329ab5eee 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD06.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD06.java @@ -23,24 +23,67 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.io.ByteArrayBuffer; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.nio.SelectChannelEndPoint; import org.eclipse.jetty.util.B64Code; import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.websocket.WebSocket.OnFrame; +import org.eclipse.jetty.websocket.WebSocket.OnTextMessage; +import org.eclipse.jetty.websocket.WebSocket.OnBinaryMessage; +import org.eclipse.jetty.websocket.WebSocket.OnControl; public class WebSocketConnectionD06 extends AbstractConnection implements WebSocketConnection { + public final static byte OP_CONTINUATION = 0x00; + public final static byte OP_CLOSE = 0x01; + public final static byte OP_PING = 0x02; + public final static byte OP_PONG = 0x03; + public final static byte OP_TEXT = 0x04; + public final static byte OP_BINARY = 0x05; + + public final static int CLOSE_NORMAL=1000; + public final static int CLOSE_SHUTDOWN=1001; + public final static int CLOSE_PROTOCOL=1002; + public final static int CLOSE_BADDATA=1003; + public final static int CLOSE_LARGE=1004; + + public static boolean isLastFrame(int flags) + { + return (flags&0x8)!=0; + } + + public static boolean isControlFrame(int opcode) + { + switch(opcode) + { + case OP_CLOSE: + case OP_PING: + case OP_PONG: + return true; + default: + return false; + } + } + + private final static byte[] MAGIC; - private final static byte[] NORMAL_CLOSE=new byte[] { 1000/0xff, (byte)(1000%0xff) }; private final IdleCheck _idle; private final WebSocketParser _parser; private final WebSocketGenerator _generator; - private final WebSocket _websocket; + private final WebSocket _webSocket; + private final OnFrame _onFrame; + private final OnBinaryMessage _onBinaryMessage; + private final OnTextMessage _onTextMessage; + private final OnControl _onControl; private boolean _closedIn; private boolean _closedOut; + private int _maxTextMessageSize; + private int _maxBinaryMessageSize=-1; static { @@ -57,10 +100,13 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc private final WebSocketParser.FrameHandler _frameHandler= new WebSocketParser.FrameHandler() { private final Utf8StringBuilder _utf8 = new Utf8StringBuilder(); + private ByteArrayBuffer _aggregate; private byte _opcode=-1; - public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer) + public void onFrame(byte flags, byte opcode, Buffer buffer) { + boolean more=(flags&0x8)==0; + synchronized(WebSocketConnectionD06.this) { // Ignore incoming after a close @@ -71,65 +117,89 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc { byte[] array=buffer.array(); + // Deliver frame if websocket is a FrameWebSocket + if (_onFrame!=null) + { + if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length())) + return; + } + + if (_onControl!=null && isControlFrame(opcode)) + { + if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length())) + return; + } + switch(opcode) { - case WebSocket.OP_CONTINUATION: + case WebSocketConnectionD06.OP_CONTINUATION: { // If text, append to the message buffer - if (_opcode==WebSocket.OP_TEXT) + if (_opcode==WebSocketConnectionD06.OP_TEXT && _connection.getMaxTextMessageSize()>=0) { - _utf8.append(buffer.array(),buffer.getIndex(),buffer.length()); - - // If this is the last fragment, deliver the text buffer - if (!more) + if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize())) { - String msg =_utf8.toString(); + // If this is the last fragment, deliver the text buffer + if (more && _onTextMessage!=null) + { + _opcode=-1; + String msg =_utf8.toString(); + _utf8.reset(); + _onTextMessage.onMessage(msg); + } + } + else + { + _connection.disconnect(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars"); _utf8.reset(); _opcode=-1; - _websocket.onMessage(WebSocket.OP_TEXT,msg); + } + + } + else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0) + { + if (_aggregate.space()<_aggregate.length()) + { + _connection.disconnect(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize()); + _aggregate.clear(); + _opcode=-1; + } + else + { + _aggregate.put(buffer); + + // If this is the last fragment, deliver + if (!more && _onBinaryMessage!=null) + { + try + { + _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length()); + } + finally + { + _opcode=-1; + _aggregate.clear(); + } + } } } - else - { - // deliver the non-text fragment - if (!more) - _opcode=-1; - _websocket.onFragment(more,_opcode,array,buffer.getIndex(),buffer.length()); - } break; } - - case WebSocket.OP_TEXT: - { - if (more) - { - // If this is a text fragment, append to buffer - _opcode=WebSocket.OP_TEXT; - _utf8.append(buffer.array(),buffer.getIndex(),buffer.length()); - } - else - { - // Deliver the message - _websocket.onMessage(opcode,buffer.toString(StringUtil.__UTF8)); - } - break; - } - - case WebSocket.OP_PING: + case WebSocketConnectionD06.OP_PING: { Log.debug("PING {}",this); if (!_closedOut) - getOutbound().sendMessage(WebSocket.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length()); + _connection.sendControl(WebSocketConnectionD06.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length()); break; } - case WebSocket.OP_PONG: + case WebSocketConnectionD06.OP_PONG: { Log.debug("PONG {}",this); break; } - case WebSocket.OP_CLOSE: + case WebSocketConnectionD06.OP_CLOSE: { int code=-1; String message=null; @@ -143,15 +213,64 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc break; } + + case WebSocketConnectionD06.OP_TEXT: + { + if(_onTextMessage!=null) + { + if (more) + { + if (_connection.getMaxTextMessageSize()>=0) + { + // If this is a text fragment, append to buffer + if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize())) + _opcode=WebSocketConnectionD06.OP_TEXT; + else + { + _utf8.reset(); + _opcode=-1; + _connection.disconnect(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars"); + } + } + } + else + { + // Deliver the message + _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8)); + } + } + break; + } + default: { - if (more) + if (_onBinaryMessage!=null) { - _opcode=opcode; - _websocket.onFragment(more,opcode,array,buffer.getIndex(),buffer.length()); + if (more) + { + if (_connection.getMaxBinaryMessageSize()>=0) + { + if (buffer.length()>_connection.getMaxBinaryMessageSize()) + { + _connection.disconnect(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize()); + if (_aggregate!=null) + _aggregate.clear(); + _opcode=-1; + } + else + { + _opcode=opcode; + if (_aggregate==null) + _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize()); + _aggregate.put(buffer); + } + } + } + else + { + _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length()); + } } - else - _websocket.onMessage(opcode,array,buffer.getIndex(),buffer.length()); } } } @@ -166,34 +285,36 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc } } + public void close(int code,String message) + { + } + public String toString() { return WebSocketConnectionD06.this.toString()+"FH"; } + }; - private final WebSocket.Outbound _outbound = new WebSocket.Outbound() + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + private final WebSocket.Connection _connection = new WebSocket.Connection() { volatile boolean _disconnecting; - - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(java.lang.String) - */ - public void sendMessage(String content) throws IOException - { - sendMessage(WebSocket.OP_TEXT,content); - } + int _maxTextMessage=WebSocketConnectionD06.this._maxTextMessageSize; + int _maxBinaryMessage=WebSocketConnectionD06.this._maxBinaryMessageSize; /* ------------------------------------------------------------ */ /** * @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(byte, java.lang.String) */ - public synchronized void sendMessage(byte opcode, String content) throws IOException + public synchronized void sendMessage(String content) throws IOException { if (_closedOut) throw new IOException("closing"); - _generator.addFrame(opcode,content,_endp.getMaxIdleTime()); + byte[] data = content.getBytes(StringUtil.__UTF8); + _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,0,data.length,_endp.getMaxIdleTime()); _generator.flush(); checkWriteable(); _idle.access(_endp); @@ -203,11 +324,11 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc /** * @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(byte, byte[], int, int) */ - public synchronized void sendMessage(byte opcode, byte[] content, int offset, int length) throws IOException + public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException { if (_closedOut) throw new IOException("closing"); - _generator.addFrame(opcode,content,offset,length,_endp.getMaxIdleTime()); + _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_BINARY,content,offset,length,_endp.getMaxIdleTime()); _generator.flush(); checkWriteable(); _idle.access(_endp); @@ -215,18 +336,34 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc /* ------------------------------------------------------------ */ /** - * @see org.eclipse.jetty.websocket.WebSocketConnection#sendFragment(boolean, byte, byte[], int, int) + * @see org.eclipse.jetty.websocket.WebSocketConnection#sendFrame(boolean, byte, byte[], int, int) */ - public void sendFragment(boolean more,byte opcode, byte[] content, int offset, int length) throws IOException + public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException { if (_closedOut) throw new IOException("closing"); - _generator.addFragment(!more,opcode,content,offset,length,_endp.getMaxIdleTime()); + _generator.addFrame(flags,opcode,content,offset,length,_endp.getMaxIdleTime()); _generator.flush(); checkWriteable(); _idle.access(_endp); } + /* ------------------------------------------------------------ */ + public void sendControl(byte control, byte[] data, int offset, int length) throws IOException + { + if (_closedOut) + throw new IOException("closing"); + _generator.addFrame((byte)0x8,control,data,offset,length,_endp.getMaxIdleTime()); + _generator.flush(); + checkWriteable(); + _idle.access(_endp); + } + + /* ------------------------------------------------------------ */ + public boolean isMore(byte flags) + { + return (flags&0x8)==0; + } /* ------------------------------------------------------------ */ public boolean isOpen() @@ -251,7 +388,31 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc _disconnecting=true; WebSocketConnectionD06.this.closeOut(1000,null); } - + + /* ------------------------------------------------------------ */ + public void setMaxTextMessageSize(int size) + { + _maxTextMessage=size; + } + + /* ------------------------------------------------------------ */ + public void setMaxBinaryMessageSize(int size) + { + _maxBinaryMessage=size; + } + + /* ------------------------------------------------------------ */ + public int getMaxTextMessageSize() + { + return _maxTextMessage; + } + + /* ------------------------------------------------------------ */ + public int getMaxBinaryMessageSize() + { + return _maxBinaryMessage; + } + }; /* ------------------------------------------------------------ */ @@ -273,7 +434,11 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc _endp.setMaxIdleTime(maxIdleTime); - _websocket = websocket; + _webSocket = websocket; + _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null; + _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null; + _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null; + _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null; _generator = new WebSocketGeneratorD06(buffers, _endp,null); _parser = new WebSocketParserD06(buffers, endpoint, _frameHandler,true); @@ -299,12 +464,15 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc {} }; } + + _maxTextMessageSize=buffers.getBufferSize(); + _maxBinaryMessageSize=-1; } /* ------------------------------------------------------------ */ - public WebSocket.Outbound getOutbound() + public WebSocket.Connection getConnection() { - return _outbound; + return _connection; } /* ------------------------------------------------------------ */ @@ -366,7 +534,7 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc @Override public void idleExpired() { - closeOut(WebSocket.CLOSE_NORMAL,"Idle"); + closeOut(WebSocketConnectionD06.CLOSE_NORMAL,"Idle"); } /* ------------------------------------------------------------ */ @@ -378,7 +546,7 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc /* ------------------------------------------------------------ */ public void closed() { - _websocket.onDisconnect(); + _webSocket.onDisconnect(WebSocketConnectionD06.CLOSE_NORMAL,""); } /* ------------------------------------------------------------ */ @@ -410,16 +578,14 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc { if (_closedIn || _closedOut) _endp.close(); - else if (code<=0) - { - _generator.addFrame(WebSocket.OP_CLOSE,NORMAL_CLOSE,0,2,_endp.getMaxIdleTime()); - } - else + else { + if (code<=0) + code=WebSocketConnectionD06.CLOSE_NORMAL; byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1); - bytes[0]=(byte)(code/0xff); - bytes[1]=(byte)(code%0xff); - _generator.addFrame(WebSocket.OP_CLOSE,bytes,0,bytes.length,_endp.getMaxIdleTime()); + bytes[0]=(byte)(code/0x100); + bytes[1]=(byte)(code%0x100); + _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_CLOSE,bytes,0,bytes.length,_endp.getMaxIdleTime()); } _generator.flush(); @@ -433,7 +599,6 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc _closedOut=true; } } - /* ------------------------------------------------------------ */ public void fillBuffersFrom(Buffer buffer) @@ -441,7 +606,6 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc _parser.fill(buffer); } - /* ------------------------------------------------------------ */ private void checkWriteable() { @@ -471,7 +635,7 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc response.addHeader("Sec-WebSocket-Protocol",subprotocol); response.sendError(101); - _websocket.onConnect(_outbound); + _webSocket.onConnect(_connection); } /* ------------------------------------------------------------ */ @@ -489,6 +653,4 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc throw new RuntimeException(e); } } - - } diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java index 8ba0b69b3c8..4984e9df091 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java @@ -133,4 +133,17 @@ public class WebSocketFactory // Tell jetty about the new connection request.setAttribute("org.eclipse.jetty.io.Connection",connection); } + + public static String[] parseProtocols(String protocol) + { + if (protocol==null) + return new String[]{null}; + protocol=protocol.trim(); + if (protocol==null || protocol.length()==0) + return new String[]{null}; + String[] passed = protocol.split("\\s*,\\s*"); + String[] protocols = new String[passed.length+1]; + System.arraycopy(passed,0,protocols,0,passed.length); + return protocols; + } } diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGenerator.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGenerator.java index b889e464205..8869a52090f 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGenerator.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGenerator.java @@ -24,8 +24,6 @@ public interface WebSocketGenerator { int flush() throws IOException; boolean isBufferEmpty(); - void addFrame(byte opcode, String content, int maxIdleTime) throws IOException; - void addFrame(byte opcode, byte[] content, int offset, int length, int maxIdleTime) throws IOException; - void addFragment(boolean last, byte opcode,byte[] content, int offset, int length, int maxIdleTime) throws IOException; + void addFrame(byte flags,byte opcode, byte[] content, int offset, int length, int maxIdleTime) throws IOException; int flush(int maxIdleTime) throws IOException; } diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD00.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD00.java index 5fa06f42ca3..c9f6f09a0ed 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD00.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD00.java @@ -40,7 +40,7 @@ public class WebSocketGeneratorD00 implements WebSocketGenerator _endp=endp; } - public synchronized void addFrame(byte opcode,byte[] content, int offset, int length, int blockFor) throws IOException + public synchronized void addFrame(byte flags, byte opcode,byte[] content, int offset, int length, int blockFor) throws IOException { if (_buffer==null) _buffer=_buffers.getDirectBuffer(); @@ -98,7 +98,7 @@ public class WebSocketGeneratorD00 implements WebSocketGenerator private synchronized boolean isLengthFrame(byte frame) { - return (frame & WebSocket.LENGTH_FRAME) == WebSocket.LENGTH_FRAME; + return (frame & WebSocketConnectionD00.LENGTH_FRAME) == WebSocketConnectionD00.LENGTH_FRAME; } private synchronized void bufferPut(byte datum, long blockFor) throws IOException @@ -110,12 +110,6 @@ public class WebSocketGeneratorD00 implements WebSocketGenerator expelBuffer(blockFor); } - public synchronized void addFrame(byte frame, String content, int blockFor) throws IOException - { - byte[] bytes = content.getBytes("UTF-8"); - addFrame(frame, bytes, 0, bytes.length, blockFor); - } - public synchronized int flush(int blockFor) throws IOException { return expelBuffer(blockFor); @@ -170,12 +164,5 @@ public class WebSocketGeneratorD00 implements WebSocketGenerator { return _buffer==null || _buffer.length()==0; } - - public void addFragment(boolean last,byte opcode, byte[] content, int offset, int length, int maxIdleTime) throws IOException - { - if (!last) - throw new UnsupportedOperationException("fragmented"); - addFrame(opcode,content,offset,length,maxIdleTime); - } } diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD01.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD01.java index e6b5d80cdc1..94a8ca7179a 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD01.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD01.java @@ -38,19 +38,8 @@ public class WebSocketGeneratorD01 implements WebSocketGenerator _buffers=buffers; _endp=endp; } - - public synchronized void addFrame(byte opcode,byte[] content, int blockFor) throws IOException - { - addFrame(opcode,content,0,content.length,blockFor); - } - - public synchronized void addFrame(byte opcode,byte[] content, int offset, int length, int blockFor) throws IOException - { - addFragment(true,opcode,content,offset,length,blockFor); - } - - public synchronized void addFragment(boolean last,byte opcode, byte[] content, int offset, int length, int blockFor) throws IOException + public synchronized void addFrame(byte flags,byte opcode, byte[] content, int offset, int length, int blockFor) throws IOException { if (_buffer==null) _buffer=_buffers.getDirectBuffer(); @@ -69,7 +58,7 @@ public class WebSocketGeneratorD01 implements WebSocketGenerator fragment=_buffer.capacity()-10; bufferPut((byte)(0x80|opcode), blockFor); } - else if (last) + else if ((flags&0x8)==0) bufferPut(opcode, blockFor); else bufferPut((byte)(0x80|opcode), blockFor); @@ -134,12 +123,6 @@ public class WebSocketGeneratorD01 implements WebSocketGenerator expelBuffer(blockFor); } - public synchronized void addFrame(byte frame, String content, int blockFor) throws IOException - { - byte[] bytes = content.getBytes("UTF-8"); - addFrame(frame, bytes, 0, bytes.length, blockFor); - } - public synchronized int flush(int blockFor) throws IOException { return expelBuffer(blockFor); diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD06.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD06.java index 328d39c2633..c956df594ff 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD06.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD06.java @@ -20,6 +20,7 @@ import java.util.Random; import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.util.TypeUtil; /* ------------------------------------------------------------ */ @@ -108,34 +109,30 @@ public class WebSocketGeneratorD06 implements WebSocketGenerator _maskGen=maskGen; } - public synchronized void addFrame(byte opcode,byte[] content, int blockFor) throws IOException - { - _opsent=false; - addFrame(opcode,content,0,content.length,blockFor); - } - - - public synchronized void addFrame(byte opcode,byte[] content, int offset, int length, int blockFor) throws IOException - { - _opsent=false; - addFragment(true,opcode,content,offset,length,blockFor); - } - - public synchronized void addFragment(boolean last, byte opcode, byte[] content, int offset, int length, int blockFor) throws IOException + public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length, int blockFor) throws IOException { + // System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length); + if (_buffer==null) _buffer=(_maskGen!=null)?_buffers.getBuffer():_buffers.getDirectBuffer(); + boolean last=WebSocketConnectionD06.isLastFrame(flags); + opcode=(byte)(((0xf&flags)<<4)+0xf&opcode); + int space=(_maskGen!=null)?14:10; do { - opcode = _opsent?WebSocket.OP_CONTINUATION:(byte)(opcode & 0x0f); + opcode = _opsent?WebSocketConnectionD06.OP_CONTINUATION:opcode; _opsent=true; int payload=length; if (payload+space>_buffer.capacity()) + { + // We must fragement, so clear FIN bit + opcode&=(byte)0x7F; // Clear the FIN bit payload=_buffer.capacity()-space; + } else if (last) opcode|=(byte)0x80; // Set the FIN bit @@ -150,7 +147,7 @@ public class WebSocketGeneratorD06 implements WebSocketGenerator _m=0; _buffer.put(_mask); } - + // write the opcode and length if (payload>0xffff) { @@ -232,12 +229,6 @@ public class WebSocketGeneratorD06 implements WebSocketGenerator _buffer.put((byte)(data^_mask[+_m++%4])); } - public synchronized void addFrame(byte frame, String content, int blockFor) throws IOException - { - byte[] bytes = content.getBytes("UTF-8"); - addFrame(frame, bytes, 0, bytes.length, blockFor); - } - public synchronized int flush(int blockFor) throws IOException { return expelBuffer(blockFor); diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketHandler.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketHandler.java index 189440f9fe4..9d1f2f9cb22 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketHandler.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketHandler.java @@ -24,7 +24,7 @@ import org.eclipse.jetty.server.handler.HandlerWrapper; public abstract class WebSocketHandler extends HandlerWrapper { - private WebSocketFactory _websocket; + private WebSocketFactory _webSocketFactory; private int _bufferSize=8192; private int _maxIdleTime=-1; @@ -53,7 +53,7 @@ public abstract class WebSocketHandler extends HandlerWrapper */ public int getMaxIdleTime() { - return (int)(_websocket==null?_maxIdleTime:_websocket.getMaxIdleTime()); + return (int)(_webSocketFactory==null?_maxIdleTime:_webSocketFactory.getMaxIdleTime()); } /* ------------------------------------------------------------ */ @@ -63,8 +63,8 @@ public abstract class WebSocketHandler extends HandlerWrapper public void setMaxIdleTime(int maxIdleTime) { _maxIdleTime = maxIdleTime; - if (_websocket!=null) - _websocket.setMaxIdleTime(maxIdleTime); + if (_webSocketFactory!=null) + _webSocketFactory.setMaxIdleTime(maxIdleTime); } /* ------------------------------------------------------------ */ @@ -74,9 +74,9 @@ public abstract class WebSocketHandler extends HandlerWrapper @Override protected void doStart() throws Exception { - _websocket=new WebSocketFactory(_bufferSize); + _webSocketFactory=new WebSocketFactory(_bufferSize); if (_maxIdleTime>=0) - _websocket.setMaxIdleTime(_maxIdleTime); + _webSocketFactory.setMaxIdleTime(_maxIdleTime); super.doStart(); } @@ -88,7 +88,7 @@ public abstract class WebSocketHandler extends HandlerWrapper protected void doStop() throws Exception { super.doStop(); - _websocket=null; + _webSocketFactory=null; } /* ------------------------------------------------------------ */ @@ -97,17 +97,27 @@ public abstract class WebSocketHandler extends HandlerWrapper { if ("websocket".equalsIgnoreCase(request.getHeader("Upgrade"))) { - String subprotocol=request.getHeader("Sec-WebSocket-Protocol"); - if (subprotocol==null) // TODO remove once draft period is over - subprotocol=request.getHeader("WebSocket-Protocol"); - WebSocket websocket=doWebSocketConnect(request,subprotocol); + String protocol=request.getHeader("Sec-WebSocket-Protocol"); + if (protocol==null) // TODO remove once draft period is over + protocol=request.getHeader("WebSocket-Protocol"); + + WebSocket websocket=null; + for (String p :WebSocketFactory.parseProtocols(protocol)) + { + websocket=doWebSocketConnect(request,p); + if (websocket!=null) + { + protocol=p; + break; + } + } String host=request.getHeader("Host"); String origin=request.getHeader("Origin"); origin=checkOrigin(request,host,origin); if (websocket!=null) - _websocket.upgrade(request,response,websocket,origin,subprotocol); + _webSocketFactory.upgrade(request,response,websocket,origin,protocol); else response.sendError(503); } diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParser.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParser.java index 96f99c7732d..a6215ed1171 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParser.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParser.java @@ -30,7 +30,8 @@ public interface WebSocketParser /* ------------------------------------------------------------ */ public interface FrameHandler { - void onFrame(boolean more,byte flags, byte opcode, Buffer buffer); + void onFrame(byte flags, byte opcode, Buffer buffer); + void close(int code,String message); } Buffer getBuffer(); diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD00.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD00.java index 16983f06b82..eaf2206583e 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD00.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD00.java @@ -145,7 +145,7 @@ public class WebSocketParserD00 implements WebSocketParser { _state=STATE_START; int l=_buffer.getIndex()-_buffer.markIndex()-1; - _handler.onFrame(false,(byte)0,_opcode,_buffer.sliceFromMark(l)); + _handler.onFrame((byte)0,_opcode,_buffer.sliceFromMark(l)); _buffer.setMarkIndex(-1); if (_buffer.length()==0) { @@ -173,7 +173,7 @@ public class WebSocketParserD00 implements WebSocketParser Buffer data=_buffer.sliceFromMark(_length); _buffer.skip(_length); _state=STATE_START; - _handler.onFrame(false,(byte)0, _opcode, data); + _handler.onFrame((byte)0, _opcode, data); if (_buffer.length()==0) { diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD01.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD01.java index 899708d9d65..eccd7800285 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD01.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD01.java @@ -52,7 +52,6 @@ public class WebSocketParserD01 implements WebSocketParser private final FrameHandler _handler; private State _state=State.START; private Buffer _buffer; - private boolean _more; private byte _flags; private byte _opcode; private int _count; @@ -142,7 +141,6 @@ public class WebSocketParserD01 implements WebSocketParser b=_buffer.get(); _opcode=(byte)(b&0xf); _flags=(byte)(b>>4); - _more=(_flags&8)!=0; _state=State.LENGTH_7; continue; @@ -195,7 +193,7 @@ public class WebSocketParserD01 implements WebSocketParser if (_state==State.DATA && available>=_count) { - _handler.onFrame(_more,_flags, _opcode, _buffer.get(_count)); + _handler.onFrame(_flags, _opcode, _buffer.get(_count)); _count=0; _state=State.START; diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD06.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD06.java index 8ad43612ade..5fc8c43c967 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD06.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD06.java @@ -32,18 +32,19 @@ import org.eclipse.jetty.util.log.Log; public class WebSocketParserD06 implements WebSocketParser { public enum State { - MASK(0), OPCODE(1), LENGTH_7(2), LENGTH_16(4), LENGTH_63(10), DATA(10); + + START(0), MASK(4), OPCODE(1), LENGTH_7(1), LENGTH_16(2), LENGTH_63(8), DATA(0), SKIP(1); - int _minSize; + int _needs; - State(int minSize) + State(int needs) { - _minSize=minSize; + _needs=needs; } - int getMinSize() + int getNeeds() { - return _minSize; + return _needs; } }; @@ -54,12 +55,10 @@ public class WebSocketParserD06 implements WebSocketParser private final boolean _masked; private State _state; private Buffer _buffer; - private boolean _fin; private byte _flags; private byte _opcode; - private int _count; + private int _bytesNeeded; private long _length; - private Utf8StringBuilder _utf8; private final byte[] _mask = new byte[4]; private int _m; @@ -77,7 +76,7 @@ public class WebSocketParserD06 implements WebSocketParser _endp=endp; _handler=handler; _masked=masked; - _state=_masked?State.MASK:State.OPCODE; + _state=State.START; } /* ------------------------------------------------------------ */ @@ -113,21 +112,21 @@ public class WebSocketParserD06 implements WebSocketParser int available=_buffer.length(); // Fill buffer if we need a byte or need length - if (available < (_state.getMinSize() + (_masked?4:0)) || _state==State.DATA && available<_count) + while (available<(_state==State.SKIP?1:_bytesNeeded)) { // compact to mark (set at start of data) _buffer.compact(); // if no space, then the data is too big for buffer if (_buffer.space() == 0) - throw new IllegalStateException("FULL"); + throw new IllegalStateException("FULL: "+_state+" "+_bytesNeeded+">"+_buffer.capacity()); // catch IOExceptions (probably EOF) and try to parse what we have try { int filled=_endp.isOpen()?_endp.fill(_buffer):-1; if (filled<=0) - return total_filled; + return total_filled>0?total_filled:filled; total_filled+=filled; available=_buffer.length(); } @@ -139,85 +138,125 @@ public class WebSocketParserD06 implements WebSocketParser } // if we are here, then we have sufficient bytes to process the current state. - + // Parse the buffer byte by byte (unless it is STATE_DATA) byte b; - while (_state!=State.DATA && available-->0) + while (_state!=State.DATA && available>=(_state==State.SKIP?1:_bytesNeeded)) { switch (_state) { + case START: + _state=_masked?State.MASK:State.OPCODE; + _bytesNeeded=_state.getNeeds(); + continue; + case MASK: _buffer.get(_mask,0,4); + available-=4; _state=State.OPCODE; + _bytesNeeded=_state.getNeeds(); _m=0; continue; case OPCODE: b=_buffer.get(); + available--; if (_masked) b^=_mask[_m++%4]; _opcode=(byte)(b&0xf); - _flags=(byte)(b>>4); - _fin=(_flags&8)!=0; - _state=State.LENGTH_7; + _flags=(byte)(0xf&(b>>4)); + + if (WebSocketConnectionD06.isControlFrame(_opcode)&&!WebSocketConnectionD06.isLastFrame(_flags)) + { + _state=State.SKIP; + _handler.close(WebSocketConnectionD06.CLOSE_PROTOCOL,"fragmented control"); + } + else + _state=State.LENGTH_7; + + _bytesNeeded=_state.getNeeds(); continue; case LENGTH_7: b=_buffer.get(); + available--; if (_masked) b^=_mask[_m++%4]; switch(b) { case 127: _length=0; - _count=8; _state=State.LENGTH_63; + _bytesNeeded=_state.getNeeds(); break; case 126: _length=0; - _count=2; _state=State.LENGTH_16; + _bytesNeeded=_state.getNeeds(); break; default: _length=(0x7f&b); - _count=(int)_length; + _bytesNeeded=(int)_length; _state=State.DATA; } continue; case LENGTH_16: b=_buffer.get(); + available--; if (_masked) b^=_mask[_m++%4]; - _length = _length<<8 | b; - if (--_count==0) + _length = _length*0x100 + (0xff&b); + if (--_bytesNeeded==0) { - if (_length>=_buffer.capacity()-4) - throw new IllegalStateException("TOO LARGE"); - _count=(int)_length; - _state=State.DATA; + _bytesNeeded=(int)_length; + if (_length>_buffer.capacity()) + { + _state=State.SKIP; + _handler.close(WebSocketConnectionD06.CLOSE_LARGE,"frame size "+_length+">"+_buffer.capacity()); + } + else + { + _state=State.DATA; + } } continue; case LENGTH_63: b=_buffer.get(); + available--; if (_masked) b^=_mask[_m++%4]; - _length = _length<<8 | b; - if (--_count==0) + _length = _length*0x100 + (0xff&b); + if (--_bytesNeeded==0) { - if (_length>=_buffer.capacity()-10) - throw new IllegalStateException("TOO LARGE"); - _count=(int)_length; - _state=State.DATA; + _bytesNeeded=(int)_length; + if (_length>=_buffer.capacity()) + { + _state=State.SKIP; + _handler.close(WebSocketConnectionD06.CLOSE_LARGE,"frame size "+_length+">"+_buffer.capacity()); + } + else + { + _state=State.DATA; + } } continue; + + case SKIP: + int skip=Math.min(available,_bytesNeeded); + _buffer.skip(skip); + available-=skip; + _bytesNeeded-=skip; + if (_bytesNeeded==0) + _state=State.START; + } } - if (_state==State.DATA && available>=_count) + if (_state==State.DATA && available>=_bytesNeeded) { - Buffer data =_buffer.get(_count); + Buffer data =_buffer.get(_bytesNeeded); if (_masked) { if (data.array()==null) @@ -227,10 +266,11 @@ public class WebSocketParserD06 implements WebSocketParser for (int i=data.getIndex();i>\n",TypeUtil.toHexString(_flags),TypeUtil.toHexString(_opcode),data.length()); + _handler.onFrame(_flags, _opcode, data); + _bytesNeeded=0; + _state=State.START; if (_buffer.length()==0) { diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketServlet.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketServlet.java index fed192700e2..ce3ef693d88 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketServlet.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketServlet.java @@ -67,7 +67,17 @@ public abstract class WebSocketServlet extends HttpServlet String protocol=request.getHeader("Sec-WebSocket-Protocol"); if (protocol==null) // TODO remove once draft period is over protocol=request.getHeader("WebSocket-Protocol"); - WebSocket websocket=doWebSocketConnect(request,protocol); + + WebSocket websocket=null; + for (String p :WebSocketFactory.parseProtocols(protocol)) + { + websocket=doWebSocketConnect(request,p); + if (websocket!=null) + { + protocol=p; + break; + } + } String host=request.getHeader("Host"); String origin=request.getHeader("Origin"); diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketGeneratorD00Test.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketGeneratorD00Test.java index 31bea9e67ef..68fb431169c 100644 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketGeneratorD00Test.java +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketGeneratorD00Test.java @@ -4,11 +4,12 @@ import static junit.framework.Assert.assertEquals; import org.eclipse.jetty.io.ByteArrayBuffer; import org.eclipse.jetty.io.ByteArrayEndPoint; +import org.eclipse.jetty.util.StringUtil; import org.junit.Before; import org.junit.Test; /** - * @version $Revision: 1441 $ $Date: 2010-04-02 12:28:17 +0200 (Fri, 02 Apr 2010) $ + * @version $Revision$ $Date$ */ public class WebSocketGeneratorD00Test { @@ -28,7 +29,8 @@ public class WebSocketGeneratorD00Test @Test public void testOneString() throws Exception { - _generator.addFrame((byte)0x04,"Hell\uFF4F W\uFF4Frld",0); + byte[] data="Hell\uFF4F W\uFF4Frld".getBytes(StringUtil.__UTF8); + _generator.addFrame((byte)0x0,(byte)0x04,data,0,data.length,0); _generator.flush(); assertEquals(4,_out.get()); assertEquals(15,_out.get()); @@ -56,7 +58,7 @@ public class WebSocketGeneratorD00Test for (int i=0;i "+message); diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD00Test.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD00Test.java index 00324d22515..2344ba679a5 100644 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD00Test.java +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD00Test.java @@ -227,9 +227,9 @@ public class WebSocketMessageD00Test { boolean onConnect=false; private final CountDownLatch latch = new CountDownLatch(1); - private volatile Outbound outbound; + private volatile Connection outbound; - public void onConnect(Outbound outbound) + public void onConnect(Connection outbound) { this.outbound = outbound; if (onConnect) @@ -251,19 +251,7 @@ public class WebSocketMessageD00Test return latch.await(time, TimeUnit.MILLISECONDS); } - public void onMessage(byte frame, String data) - { - } - - public void onMessage(byte frame, byte[] data, int offset, int length) - { - } - - public void onDisconnect() - { - } - - public void onFragment(boolean more, byte opcode, byte[] data, int offset, int length) + public void onDisconnect(int code,String message) { } } diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD01Test.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD01Test.java index 3fd19dae22f..ee10ec710bd 100644 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD01Test.java +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD01Test.java @@ -283,9 +283,9 @@ public class WebSocketMessageD01Test boolean onConnect=false; private final CountDownLatch connected = new CountDownLatch(1); private final CountDownLatch disconnected = new CountDownLatch(1); - private volatile Outbound outbound; + private volatile Connection outbound; - public void onConnect(Outbound outbound) + public void onConnect(Connection outbound) { this.outbound = outbound; if (onConnect) @@ -312,21 +312,10 @@ public class WebSocketMessageD01Test return disconnected.await(time, TimeUnit.MILLISECONDS); } - public void onMessage(byte frame, String data) - { - } - - public void onMessage(byte frame, byte[] data, int offset, int length) - { - } - - public void onDisconnect() + public void onDisconnect(int code,String message) { disconnected.countDown(); } - public void onFragment(boolean more, byte opcode, byte[] data, int offset, int length) - { - } } } diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD06Test.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD06Test.java index fa95fc45a32..573be26f27e 100644 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD06Test.java +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD06Test.java @@ -51,6 +51,7 @@ public class WebSocketMessageD06Test _serverWebSocket = new TestWebSocket(); _serverWebSocket.onConnect=("onConnect".equals(protocol)); _serverWebSocket.echo=("echo".equals(protocol)); + _serverWebSocket.aggregate=("aggregate".equals(protocol)); return _serverWebSocket; } }; @@ -102,7 +103,7 @@ public class WebSocketMessageD06Test skipTo("\r\n\r\n",input); assertTrue(_serverWebSocket.awaitConnected(1000)); - assertNotNull(_serverWebSocket.outbound); + assertNotNull(_serverWebSocket.connection); // Server sends a big message StringBuilder message = new StringBuilder(); @@ -110,9 +111,9 @@ public class WebSocketMessageD06Test for (int i = 0; i < (0x2000) / text.length(); i++) message.append(text); String data=message.toString(); - _serverWebSocket.outbound.sendMessage(data); + _serverWebSocket.connection.sendMessage(data); - assertEquals(WebSocket.OP_TEXT,input.read()); + assertEquals(WebSocketConnectionD06.OP_TEXT,input.read()); assertEquals(0x7e,input.read()); assertEquals(0x1f,input.read()); assertEquals(0xf6,input.read()); @@ -150,7 +151,7 @@ public class WebSocketMessageD06Test skipTo("\r\n\r\n",input); assertTrue(_serverWebSocket.awaitConnected(1000)); - assertNotNull(_serverWebSocket.outbound); + assertNotNull(_serverWebSocket.connection); assertEquals(0x84,input.read()); assertEquals(0x0f,input.read()); @@ -194,7 +195,7 @@ public class WebSocketMessageD06Test skipTo("\r\n\r\n",input); assertTrue(_serverWebSocket.awaitConnected(1000)); - assertNotNull(_serverWebSocket.outbound); + assertNotNull(_serverWebSocket.connection); assertEquals(0x84,input.read()); assertEquals(0x0f,input.read()); @@ -234,12 +235,282 @@ public class WebSocketMessageD06Test skipTo("\r\n\r\n",input); assertTrue(_serverWebSocket.awaitConnected(1000)); - assertNotNull(_serverWebSocket.outbound); + assertNotNull(_serverWebSocket.connection); socket.setSoTimeout(1000); assertEquals(0x83,input.read()); assertEquals(0x00,input.read()); } + + @Test + public void testMaxTextSize() throws Exception + { + Socket socket = new Socket("localhost", _connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: other\r\n" + + "Sec-WebSocket-Version: 6\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + + socket.setSoTimeout(1000); + InputStream input = socket.getInputStream(); + + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("\r\n\r\n",input); + + assertTrue(_serverWebSocket.awaitConnected(1000)); + assertNotNull(_serverWebSocket.connection); + + _serverWebSocket.getConnection().setMaxTextMessageSize(15); + + output.write(0xff); + output.write(0xff); + output.write(0xff); + output.write(0xff); + output.write(0x04^0xff); + output.write(0x0a^0xff); + byte[] bytes="0123456789".getBytes(StringUtil.__ISO_8859_1); + for (int i=0;i 15 chars",input); + } + + + @Test + public void testMaxTextSize2() throws Exception + { + Socket socket = new Socket("localhost", _connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: other\r\n" + + "Sec-WebSocket-Version: 6\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + + socket.setSoTimeout(100000); + InputStream input = socket.getInputStream(); + + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("\r\n\r\n",input); + + assertTrue(_serverWebSocket.awaitConnected(1000)); + assertNotNull(_serverWebSocket.connection); + + _serverWebSocket.getConnection().setMaxTextMessageSize(15); + + output.write(0xff); + output.write(0xff); + output.write(0xff); + output.write(0xff); + output.write(0x04^0xff); + output.write(0x14^0xff); + byte[] bytes="01234567890123456789".getBytes(StringUtil.__ISO_8859_1); + for (int i=0;i 15 chars",input); + } + + @Test + public void testBinaryAggregate() throws Exception + { + Socket socket = new Socket("localhost", _connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: aggregate\r\n" + + "Sec-WebSocket-Version: 6\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + + socket.setSoTimeout(1000); + InputStream input = socket.getInputStream(); + + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("\r\n\r\n",input); + + assertTrue(_serverWebSocket.awaitConnected(1000)); + assertNotNull(_serverWebSocket.connection); + _serverWebSocket.getConnection().setMaxBinaryMessageSize(1024); + + output.write(0xff); + output.write(0xff); + output.write(0xff); + output.write(0xff); + output.write(WebSocketConnectionD06.OP_BINARY^0xff); + output.write(0x0a^0xff); + byte[] bytes="0123456789".getBytes(StringUtil.__ISO_8859_1); + for (int i=0;i 15",input); + } + + + @Test + public void testMaxBinarySize2() throws Exception + { + Socket socket = new Socket("localhost", _connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: other\r\n" + + "Sec-WebSocket-Version: 6\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + + socket.setSoTimeout(100000); + InputStream input = socket.getInputStream(); + + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("\r\n\r\n",input); + + assertTrue(_serverWebSocket.awaitConnected(1000)); + assertNotNull(_serverWebSocket.connection); + + _serverWebSocket.getConnection().setMaxBinaryMessageSize(15); + + output.write(0xff); + output.write(0xff); + output.write(0xff); + output.write(0xff); + output.write(0x0f^0xff); + output.write(0x14^0xff); + byte[] bytes="01234567890123456789".getBytes(StringUtil.__ISO_8859_1); + for (int i=0;i 15",input); + } @Test public void testIdle() throws Exception @@ -269,7 +540,7 @@ public class WebSocketMessageD06Test skipTo("\r\n\r\n",input); assertTrue(_serverWebSocket.awaitConnected(1000)); - assertNotNull(_serverWebSocket.outbound); + assertNotNull(_serverWebSocket.connection); assertEquals(0x84,input.read()); assertEquals(0x0f,input.read()); @@ -277,8 +548,8 @@ public class WebSocketMessageD06Test assertEquals((byte)0x81,(byte)input.read()); assertEquals(0x06,input.read()); - assertEquals(1000/0xff,input.read()); - assertEquals(1000%0xff,input.read()); + assertEquals(1000/0x100,input.read()); + assertEquals(1000%0x100,input.read()); lookFor("Idle",input); // respond to close @@ -294,7 +565,7 @@ public class WebSocketMessageD06Test assertTrue(_serverWebSocket.awaitDisconnected(5000)); try { - _serverWebSocket.outbound.sendMessage("Don't send"); + _serverWebSocket.connection.sendMessage("Don't send"); assertTrue(false); } catch(IOException e) @@ -332,7 +603,7 @@ public class WebSocketMessageD06Test assertTrue(_serverWebSocket.awaitConnected(1000)); - assertNotNull(_serverWebSocket.outbound); + assertNotNull(_serverWebSocket.connection); assertEquals(0x84,input.read()); assertEquals(0x0f,input.read()); @@ -344,7 +615,7 @@ public class WebSocketMessageD06Test try { - _serverWebSocket.outbound.sendMessage("Don't send"); + _serverWebSocket.connection.sendMessage("Don't send"); assertTrue(false); } catch(IOException e) @@ -361,16 +632,23 @@ public class WebSocketMessageD06Test ByteArrayEndPoint endp = new ByteArrayEndPoint(new byte[0],4096); WebSocketGeneratorD06 gen = new WebSocketGeneratorD06(new WebSocketBuffers(8096),endp,null); - gen.addFrame((byte)0x4,message,1000); + + byte[] data = message.getBytes(StringUtil.__UTF8); + gen.addFrame((byte)0x8,(byte)0x4,data,0,data.length,1000); endp = new ByteArrayEndPoint(endp.getOut().asArray(),4096); WebSocketParserD06 parser = new WebSocketParserD06(new WebSocketBuffers(8096),endp,new WebSocketParser.FrameHandler() { - public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer) + public void onFrame(byte flags, byte opcode, Buffer buffer) { received.set(buffer.toString()); } + + public void close(int code,String message) + { + } + },false); parser.parseNext(); @@ -388,16 +666,21 @@ public class WebSocketMessageD06Test WebSocketGeneratorD06.MaskGen maskGen = new WebSocketGeneratorD06.RandomMaskGen(); WebSocketGeneratorD06 gen = new WebSocketGeneratorD06(new WebSocketBuffers(8096),endp,maskGen); - gen.addFrame((byte)0x4,message,1000); + byte[] data = message.getBytes(StringUtil.__UTF8); + gen.addFrame((byte)0x8,(byte)0x4,data,0,data.length,1000); endp = new ByteArrayEndPoint(endp.getOut().asArray(),4096); WebSocketParserD06 parser = new WebSocketParserD06(new WebSocketBuffers(8096),endp,new WebSocketParser.FrameHandler() { - public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer) + public void onFrame(byte flags, byte opcode, Buffer buffer) { received.set(buffer.toString()); } + + public void close(int code,String message) + { + } },true); parser.parseNext(); @@ -455,22 +738,28 @@ public class WebSocketMessageD06Test } - private static class TestWebSocket implements WebSocket + private static class TestWebSocket implements WebSocket.OnFrame, WebSocket.OnBinaryMessage, WebSocket.OnTextMessage { boolean onConnect=false; boolean echo=true; + boolean aggregate=false; private final CountDownLatch connected = new CountDownLatch(1); private final CountDownLatch disconnected = new CountDownLatch(1); - private volatile Outbound outbound; + private volatile Connection connection; - public void onConnect(Outbound outbound) + public Connection getConnection() { - this.outbound = outbound; + return connection; + } + + public void onConnect(Connection connection) + { + this.connection = connection; if (onConnect) { try { - outbound.sendMessage("sent on connect"); + connection.sendMessage("sent on connect"); } catch(IOException e) { @@ -490,54 +779,65 @@ public class WebSocketMessageD06Test return disconnected.await(time, TimeUnit.MILLISECONDS); } - public void onMessage(byte opcode, String data) - { - if (echo) - { - try - { - outbound.sendMessage(opcode,data); - } - catch(IOException e) - { - e.printStackTrace(); - } - } - } - - public void onMessage(byte opcode, byte[] data, int offset, int length) - { - if (echo) - { - try - { - outbound.sendMessage(opcode,data,offset,length); - } - catch(IOException e) - { - e.printStackTrace(); - } - } - } - - public void onDisconnect() + public void onDisconnect(int code,String message) { disconnected.countDown(); } - public void onFragment(boolean more, byte opcode, byte[] data, int offset, int length) + public boolean onFrame(byte flags, byte opcode, byte[] data, int offset, int length) { if (echo) + { + switch(opcode) + { + case WebSocketConnectionD06.OP_CLOSE: + case WebSocketConnectionD06.OP_PING: + case WebSocketConnectionD06.OP_PONG: + break; + + default: + try + { + connection.sendFrame(flags,opcode,data,offset,length); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + } + return false; + } + + public void onMessage(byte[] data, int offset, int length) + { + if (aggregate) { try { - outbound.sendFragment(more,opcode,data,offset,length); + connection.sendMessage(data,offset,length); } - catch(IOException e) + catch (IOException e) { e.printStackTrace(); } } } + + public void onMessage(String data) + { + if (aggregate) + { + try + { + connection.sendMessage(data); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + } + } } diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageW75Test.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageW75Test.java index a1927cb4e5f..373b774f63c 100644 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageW75Test.java +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageW75Test.java @@ -147,7 +147,7 @@ public class WebSocketMessageW75Test for (int i = 0; i < 64 * 1024 / text.length(); ++i) message.append(text); byte[] data = message.toString().getBytes("UTF-8"); - _serverWebSocket.outbound.sendMessage(WebSocket.LENGTH_FRAME, data,0,data.length); + _serverWebSocket.outbound.sendMessage(data,0,data.length); // Length of the message is 65536, so the length will be encoded as 0x84 0x80 0x00 int frame = input.read(); @@ -170,9 +170,9 @@ public class WebSocketMessageW75Test private static class TestWebSocket implements WebSocket { private final CountDownLatch latch = new CountDownLatch(1); - private volatile Outbound outbound; + private volatile Connection outbound; - public void onConnect(Outbound outbound) + public void onConnect(Connection outbound) { this.outbound = outbound; latch.countDown(); @@ -183,20 +183,9 @@ public class WebSocketMessageW75Test return latch.await(time, TimeUnit.MILLISECONDS); } - public void onMessage(byte frame, String data) + public void onDisconnect(int code,String data) { } - public void onMessage(byte frame, byte[] data, int offset, int length) - { - } - - public void onDisconnect() - { - } - - public void onFragment(boolean more, byte opcode, byte[] data, int offset, int length) - { - } } } diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD00Test.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD00Test.java index 6f3f0460e34..bfdcf1bae35 100644 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD00Test.java +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD00Test.java @@ -17,7 +17,7 @@ import org.junit.Before; import org.junit.Test; /** - * @version $Revision: 1441 $ $Date: 2010-04-02 12:28:17 +0200 (Fri, 02 Apr 2010) $ + * @version $Revision$ $Date$ */ public class WebSocketParserD00Test { @@ -141,9 +141,13 @@ public class WebSocketParserD00Test { public List _data = new ArrayList(); - public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer) + public void onFrame(byte flags, byte opcode, Buffer buffer) { _data.add(buffer.toString(StringUtil.__UTF8)); } + + public void close(int code,String message) + { + } } } diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD01Test.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD01Test.java index 0cb2e6297e7..552e6ca9452 100644 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD01Test.java +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD01Test.java @@ -18,7 +18,7 @@ import org.junit.Before; import org.junit.Test; /** - * @version $Revision: 1441 $ $Date: 2010-04-02 12:28:17 +0200 (Fri, 02 Apr 2010) $ + * @version $Revision$ $Date$ */ public class WebSocketParserD01Test { @@ -169,9 +169,9 @@ public class WebSocketParserD01Test Utf8StringBuilder _utf8 = new Utf8StringBuilder(); public List _data = new ArrayList(); - public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer) + public void onFrame(byte flags, byte opcode, Buffer buffer) { - if (more) + if ((flags&0x8)!=0) _utf8.append(buffer.array(),buffer.getIndex(),buffer.length()); else if (_utf8.length()==0) _data.add(opcode,buffer.toString("utf-8")); @@ -182,5 +182,9 @@ public class WebSocketParserD01Test _utf8.reset(); } } + + public void close(int code,String message) + { + } } } diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD06Test.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD06Test.java index 1f1d66bbfe2..f3d99f8f94b 100644 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD06Test.java +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD06Test.java @@ -13,7 +13,6 @@ import org.eclipse.jetty.io.BufferCache.CachedBuffer; import org.eclipse.jetty.io.ByteArrayBuffer; import org.eclipse.jetty.io.ByteArrayEndPoint; import org.eclipse.jetty.util.StringUtil; -import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.Utf8StringBuilder; import org.junit.Before; import org.junit.Test; @@ -94,6 +93,22 @@ public class WebSocketParserD06Test assertEquals(HttpHeaderValues.UPGRADE_ORDINAL ,((CachedBuffer)HttpHeaderValues.CACHE.lookup("Upgrade")).getOrdinal()); } + @Test + public void testFlagsOppcode() throws Exception + { + _in.sendMask(); + _in.put((byte)0xff); + _in.put((byte)0); + + int filled =_parser.parseNext(); + + assertEquals(6,filled); + assertEquals(0xf,_handler._flags); + assertEquals(0xf,_handler._opcode); + assertTrue(_parser.isBufferEmpty()); + assertTrue(_parser.getBuffer()==null); + } + @Test public void testShortText() throws Exception { @@ -107,6 +122,8 @@ public class WebSocketParserD06Test assertEquals(17,filled); assertEquals("Hello World",_handler._data.get(0)); + assertEquals(0x8,_handler._flags); + assertEquals(0x4,_handler._opcode); assertTrue(_parser.isBufferEmpty()); assertTrue(_parser.getBuffer()==null); } @@ -126,6 +143,8 @@ public class WebSocketParserD06Test assertEquals(bytes.length+6,filled); assertEquals(string,_handler._data.get(0)); + assertEquals(0x8,_handler._flags); + assertEquals(0x4,_handler._opcode); assertTrue(_parser.isBufferEmpty()); assertTrue(_parser.getBuffer()==null); } @@ -138,8 +157,8 @@ public class WebSocketParserD06Test string = string+string; string += ". The end."; - byte[] bytes = string.getBytes("UTF-8"); - + byte[] bytes = string.getBytes(StringUtil.__UTF8); + _in.sendMask(); _in.put((byte)0x84); _in.put((byte)0x7E); @@ -151,6 +170,8 @@ public class WebSocketParserD06Test assertEquals(bytes.length+8,filled); assertEquals(string,_handler._data.get(0)); + assertEquals(0x8,_handler._flags); + assertEquals(0x4,_handler._opcode); assertTrue(_parser.isBufferEmpty()); assertTrue(_parser.getBuffer()==null); } @@ -158,14 +179,12 @@ public class WebSocketParserD06Test @Test public void testLongText() throws Exception { - WebSocketBuffers buffers = new WebSocketBuffers(0x20000); ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); WebSocketParser parser=new WebSocketParserD06(buffers, endPoint,_handler,false); ByteArrayBuffer in = new ByteArrayBuffer(0x20000); endPoint.setIn(in); - String string = "Hell\uFF4f Big W\uFF4Frld "; for (int i=0;i<12;i++) string = string+string; @@ -221,15 +240,59 @@ public class WebSocketParserD06Test assertTrue(_parser.getBuffer()==null); } + @Test + public void testFrameTooLarge() throws Exception + { + _in.sendMask(); + _in.put((byte)0x84); + _in.put((byte)0x7E); + _in.put((byte)(2048>>8)); + _in.put((byte)(2048&0xff)); + + int filled =_parser.parseNext(); + + assertEquals(8,filled); + + assertEquals(WebSocketConnectionD06.CLOSE_LARGE,_handler._code); + for (int i=0;i<2048;i++) + _in.put((byte)'a'); + filled =_parser.parseNext(); + + assertEquals(2048,filled); + assertEquals(0,_handler._data.size()); + assertEquals(0,_handler._utf8.length()); + + _handler._code=0; + _handler._message=null; + + _in.sendMask(); + _in.put((byte)0x84); + _in.put((byte)0x7E); + _in.put((byte)(1024>>8)); + _in.put((byte)(1024&0xff)); + for (int i=0;i<1024;i++) + _in.put((byte)'a'); + + filled =_parser.parseNext(); + assertEquals(1024+8,filled); + assertEquals(1,_handler._data.size()); + assertEquals(1024,_handler._data.get(0).length()); + } private class Handler implements WebSocketParser.FrameHandler { Utf8StringBuilder _utf8 = new Utf8StringBuilder(); public List _data = new ArrayList(); + private byte _flags; + private byte _opcode; + int _code; + String _message; - public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer) + public void onFrame(byte flags, byte opcode, Buffer buffer) { - if (more) + _flags=flags; + _opcode=opcode; + if ((flags&0x8)==0) _utf8.append(buffer.array(),buffer.getIndex(),buffer.length()); else if (_utf8.length()==0) _data.add(buffer.toString("utf-8")); @@ -240,5 +303,11 @@ public class WebSocketParserD06Test _utf8.reset(); } } + + public void close(int code,String message) + { + _code=code; + _message=message; + } } } diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketPingD06.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketPingD06.java deleted file mode 100644 index ea313b6e65d..00000000000 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketPingD06.java +++ /dev/null @@ -1,200 +0,0 @@ -package org.eclipse.jetty.websocket; - - -import java.io.BufferedReader; -import java.io.BufferedWriter; -import java.io.IOException; -import java.io.InputStreamReader; -import java.io.OutputStreamWriter; -import java.net.Socket; -import java.security.SecureRandom; -import java.util.Random; -import java.util.concurrent.BlockingQueue; -import java.util.concurrent.LinkedBlockingQueue; - -import junit.framework.Assert; - -import org.eclipse.jetty.io.Buffer; -import org.eclipse.jetty.io.bio.SocketEndPoint; -import org.eclipse.jetty.util.B64Code; -import org.eclipse.jetty.util.TypeUtil; - -/** - * @version $Revision$ $Date$ - */ -public class WebSocketPingD06 -{ - private final static Random __random = new SecureRandom(); - private final String _host; - private final int _port; - private final int _size=64; - private final Socket _socket; - private final BufferedWriter _output; - private final BufferedReader _input; - private final SocketEndPoint _endp; - private final WebSocketGeneratorD06 _generator; - private final WebSocketParserD06 _parser; - private int _sent; - private int _received; - private long _totalTime; - private long _minDuration=Long.MAX_VALUE; - private long _maxDuration=Long.MIN_VALUE; - private long _start; - private BlockingQueue _starts = new LinkedBlockingQueue(); - private BlockingQueue _pending = new LinkedBlockingQueue(); - private final WebSocketParser.FrameHandler _handler = new WebSocketParser.FrameHandler() - { - public synchronized void onFrame(boolean more, byte flags, byte opcode, Buffer buffer) - { - - long start=_starts.poll(); - String data=_pending.poll(); - - while (!data.equals(TypeUtil.toHexString(buffer.asArray())) && !_starts.isEmpty() && !_pending.isEmpty()) - { - // Missed response - start=_starts.poll(); - data=_pending.poll(); - } - - _received++; - - long duration = System.nanoTime()-start; - if (duration>_maxDuration) - _maxDuration=duration; - if (duration<_minDuration) - _minDuration=duration; - _totalTime+=duration; - System.out.print(buffer.length()+" bytes from "+_host+": req="+_received+" time="); - System.out.printf("%.1fms\n",((double)duration/1000000.0)); - - } - }; - - - public WebSocketPingD06(String host, int port,int timeoutMS) throws IOException - { - _host=host; - _port=port; - _socket = new Socket(host, port); - _socket.setSoTimeout(timeoutMS); - _output = new BufferedWriter(new OutputStreamWriter(_socket.getOutputStream(), "ISO-8859-1")); - _input = new BufferedReader(new InputStreamReader(_socket.getInputStream(), "ISO-8859-1")); - - _endp=new SocketEndPoint(_socket); - _generator = new WebSocketGeneratorD06(new WebSocketBuffers(32*1024),_endp,new WebSocketGeneratorD06.FixedMaskGen()); - _parser = new WebSocketParserD06(new WebSocketBuffers(32*1024),_endp,_handler,false); - - - - } - - private void open() throws IOException - { - System.out.println("Jetty WebSocket PING "+_host+":"+_port+ - " ("+_socket.getRemoteSocketAddress()+") " +_size+" bytes of data."); - byte[] key = new byte[16]; - __random.nextBytes(key); - - _output.write("GET /chat HTTP/1.1\r\n"+ - "Host: "+_host+":"+_port+"\r\n"+ - "Upgrade: websocket\r\n"+ - "Connection: Upgrade\r\n"+ - "Sec-WebSocket-Key: "+new String(B64Code.encode(key))+"\r\n"+ - "Sec-WebSocket-Origin: http://example.com\r\n"+ - "Sec-WebSocket-Protocol: lws-mirror-protocol\r\n" + - "Sec-WebSocket-Version: 6\r\n"+ - "\r\n"); - _output.flush(); - - String responseLine = _input.readLine(); - if(!responseLine.startsWith("HTTP/1.1 101 Switching Protocols")) - throw new IOException(responseLine); - // Read until we find Response key - String line; - boolean accepted=false; - String protocol=""; - while ((line = _input.readLine()) != null) - { - if (line.length() == 0) - break; - if (line.startsWith("Sec-WebSocket-Accept:")) - { - String accept=line.substring(21).trim(); - accepted=accept.equals(WebSocketConnectionD06.hashKey(new String(B64Code.encode(key)))); - } - else if (line.startsWith("Sec-WebSocket-Protocol:")) - { - protocol=line.substring(24).trim(); - } - } - - if (!accepted) - throw new IOException("Bad Sec-WebSocket-Accept"); - System.out.println("handshake OK for protocol "+protocol); - - new Thread() - { - public void run() - { - while (_endp.isOpen()) - _parser.parseNext(); - } - }.start(); - } - - public void run() - { - _start=System.currentTimeMillis(); - for (int i=0;i<10;i++) - { - try - { - byte data[] = new byte[_size]; - __random.nextBytes(data); - - - _starts.add(System.nanoTime()); - _pending.add(TypeUtil.toHexString(data)); - _sent++; - _generator.addFrame(WebSocket.OP_PING,data,_socket.getSoTimeout()); - _generator.flush(_socket.getSoTimeout()); - - Thread.sleep(1000); - - } - catch (Exception x) - { - throw new RuntimeException(x); - } - } - } - - - public void dump() throws IOException - { - _socket.close(); - long duration=System.currentTimeMillis()-_start; - System.out.println("--- "+_host+" websocket ping statistics using 1 connection ---"); - System.out.println(_sent+" packets transmitted, "+_received+" received, "+ - String.format("%d",100*(_sent-_received)/_sent)+"% loss, time "+duration+"ms"); - System.out.printf("rtt min/ave/max = %.3f/%.3f/%.3f ms\n",_minDuration/1000000.0,_totalTime/_received/1000000.0,_maxDuration/1000000.0); - } - - public static void main(String[] args) - throws Exception - { - - WebSocketPingD06 ping = new WebSocketPingD06("localhost",8080,10000); - - try - { - ping.open(); - ping.run(); - } - finally - { - ping.dump(); - } - } -} diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketTestServer.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketTestServer.java deleted file mode 100644 index 0e30575fa86..00000000000 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketTestServer.java +++ /dev/null @@ -1,102 +0,0 @@ -package org.eclipse.jetty.websocket; - -import java.io.IOException; -import java.util.concurrent.ConcurrentLinkedQueue; - -import javax.servlet.http.HttpServletRequest; - -import org.eclipse.jetty.server.Server; -import org.eclipse.jetty.server.nio.SelectChannelConnector; -import org.eclipse.jetty.util.TypeUtil; -import org.eclipse.jetty.util.log.Log; - -public class WebSocketTestServer extends Server -{ - TestWebSocket _websocket; - SelectChannelConnector _connector; - WebSocketHandler _handler; - ConcurrentLinkedQueue _webSockets = new ConcurrentLinkedQueue(); - - public WebSocketTestServer() - { - _connector = new SelectChannelConnector(); - _connector.setPort(8080); - - addConnector(_connector); - _handler = new WebSocketHandler() - { - @Override - protected WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) - { - _websocket = new TestWebSocket(); - return _websocket; - } - }; - - setHandler(_handler); - } - - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ - class TestWebSocket implements WebSocket - { - Outbound _outbound; - - public void onConnect(Outbound outbound) - { - System.err.println("onConnect"); - _outbound = outbound; - _webSockets.add(this); - - } - - public void onMessage(byte frame, byte[] data, int offset, int length) - { - System.err.println("onMessage " + TypeUtil.toHexString(data,offset,length)); - } - - public void onMessage(final byte frame, final String data) - { - System.err.println("onMessage " + data); - for (TestWebSocket ws : _webSockets) - { - if (ws != this) - { - try - { - if (ws._outbound.isOpen()) - ws._outbound.sendMessage(frame, data); - } - catch (IOException e) - { - e.printStackTrace(); - } - } - } - } - - public void onDisconnect() - { - _webSockets.remove(this); - } - - public void onFragment(boolean more, byte opcode, byte[] data, int offset, int length) - { - } - } - - public static void main(String[] args) - { - try - { - WebSocketTestServer server = new WebSocketTestServer(); - server.start(); - server.join(); - } - catch (Exception e) - { - Log.warn(e); - } - } - -} diff --git a/test-jetty-webapp/src/main/java/com/acme/WebSocketChatServlet.java b/test-jetty-webapp/src/main/java/com/acme/WebSocketChatServlet.java index faa756773c6..6c482ab3a68 100644 --- a/test-jetty-webapp/src/main/java/com/acme/WebSocketChatServlet.java +++ b/test-jetty-webapp/src/main/java/com/acme/WebSocketChatServlet.java @@ -32,14 +32,14 @@ public class WebSocketChatServlet extends WebSocketServlet /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ - class ChatWebSocket implements WebSocket + class ChatWebSocket implements WebSocket.OnTextMessage { - Outbound _outbound; + Connection _connection; - public void onConnect(Outbound outbound) + public void onConnect(Connection connection) { // Log.info(this+" onConnect"); - _outbound=outbound; + _connection=connection; _members.add(this); } @@ -48,10 +48,10 @@ public class WebSocketChatServlet extends WebSocketServlet // Log.info(this+" onMessage: "+TypeUtil.toHexString(data,offset,length)); } - public void onMessage(byte frame, String data) + public void onMessage(String data) { if (data.indexOf("disconnect")>=0) - _outbound.disconnect(); + _connection.disconnect(0,""); else { // Log.info(this+" onMessage: "+data); @@ -59,7 +59,7 @@ public class WebSocketChatServlet extends WebSocketServlet { try { - member._outbound.sendMessage(frame,data); + member._connection.sendMessage(data); } catch(IOException e) { @@ -69,14 +69,11 @@ public class WebSocketChatServlet extends WebSocketServlet } } - public void onDisconnect() + public void onDisconnect(int code, String message) { // Log.info(this+" onDisconnect"); _members.remove(this); } - public void onFragment(boolean more, byte opcode, byte[] data, int offset, int length) - { - } } }