From 9528870b239f242655ca07aec28ac7e6c23e13fe Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 1 Sep 2011 10:48:43 +1000 Subject: [PATCH 1/2] improved setting of default message maximums --- .../eclipse/jetty/websocket/WebSocket.java | 10 +++ .../websocket/WebSocketConnectionD00.java | 8 ++- .../websocket/WebSocketConnectionD06.java | 6 ++ .../websocket/WebSocketConnectionD12.java | 36 ++++++---- .../jetty/websocket/WebSocketFactory.java | 42 ++++++++++++ .../jetty/websocket/WebSocketHandler.java | 65 +------------------ .../jetty/websocket/WebSocketServlet.java | 23 +++++-- .../websocket/WebSocketMessageD06Test.java | 4 +- .../websocket/WebSocketMessageD12Test.java | 4 +- 9 files changed, 111 insertions(+), 87 deletions(-) diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocket.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocket.java index 958d6a8eaed..e4667f0a2a9 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocket.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocket.java @@ -112,6 +112,10 @@ public interface WebSocket void disconnect(); boolean isOpen(); + /* ------------------------------------------------------------ */ + /** + * @param ms The time in ms that the connection can be idle before closing + */ void setMaxIdleTime(int ms); /** @@ -124,6 +128,12 @@ public interface WebSocket */ void setMaxBinaryMessageSize(int size); + /* ------------------------------------------------------------ */ + /** + * @return The time in ms that the connection can be idle before closing + */ + int getMaxIdleTime(); + /** * Size in characters of the maximum text message to be received * @return size <0 No aggregation of frames to messages, >=0 max size of text frame aggregation buffer in characters diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD00.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD00.java index 0c2b2ee6423..445fd0dc6e7 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD00.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD00.java @@ -434,6 +434,11 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc return -1; } + public int getMaxIdleTime() + { + return _endp.getMaxIdleTime(); + } + public int getMaxBinaryMessageSize() { return -1; @@ -554,13 +559,10 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc public void setFakeFragments(boolean fake) { - // TODO Auto-generated method stub - } public boolean isFakeFragments() { - // TODO Auto-generated method stub return false; } } diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD06.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD06.java index a16a37057f6..74395a0a2f1 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD06.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD06.java @@ -418,6 +418,12 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc return _maxTextMessage; } + /* ------------------------------------------------------------ */ + public int getMaxIdleTime() + { + return _endp.getMaxIdleTime(); + } + /* ------------------------------------------------------------ */ public int getMaxBinaryMessageSize() { diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD12.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD12.java index 908902ce2a2..d94ae7c6fc3 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD12.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD12.java @@ -95,10 +95,9 @@ public class WebSocketConnectionD12 extends AbstractConnection implements WebSoc private volatile String _closeMessage; private volatile boolean _closedIn; private volatile boolean _closedOut; - private int _maxTextMessageSize; + private int _maxTextMessageSize=-1; private int _maxBinaryMessageSize=-1; - static { try @@ -113,9 +112,6 @@ public class WebSocketConnectionD12 extends AbstractConnection implements WebSoc private final WebSocketParser.FrameHandler _frameHandler= new WSFrameHandler(); - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ - /* ------------------------------------------------------------ */ private final WebSocket.FrameConnection _connection = new WSFrameConnection(); @@ -190,9 +186,6 @@ public class WebSocketConnectionD12 extends AbstractConnection implements WebSoc {} }; } - - _maxTextMessageSize=buffers.getBufferSize(); - _maxBinaryMessageSize=-1; } /* ------------------------------------------------------------ */ @@ -410,8 +403,6 @@ public class WebSocketConnectionD12 extends AbstractConnection implements WebSoc private class WSFrameConnection implements WebSocket.FrameConnection { volatile boolean _disconnecting; - int _maxTextMessage=WebSocketConnectionD12.this._maxTextMessageSize; - int _maxBinaryMessage=WebSocketConnectionD12.this._maxBinaryMessageSize; /* ------------------------------------------------------------ */ public void sendMessage(String content) throws IOException @@ -491,25 +482,31 @@ public class WebSocketConnectionD12 extends AbstractConnection implements WebSoc /* ------------------------------------------------------------ */ public void setMaxTextMessageSize(int size) { - _maxTextMessage=size; + _maxTextMessageSize=size; } /* ------------------------------------------------------------ */ public void setMaxBinaryMessageSize(int size) { - _maxBinaryMessage=size; + _maxBinaryMessageSize=size; } + /* ------------------------------------------------------------ */ + public int getMaxIdleTime() + { + return _endp.getMaxIdleTime(); + } + /* ------------------------------------------------------------ */ public int getMaxTextMessageSize() { - return _maxTextMessage; + return _maxTextMessageSize; } /* ------------------------------------------------------------ */ public int getMaxBinaryMessageSize() { - return _maxBinaryMessage; + return _maxBinaryMessageSize; } /* ------------------------------------------------------------ */ @@ -728,7 +725,13 @@ public class WebSocketConnectionD12 extends AbstractConnection implements WebSoc // No size limit, so handle only final frames if (lastFrame) _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8)); + else + { + LOG.warn("Frame discarded. Text aggregation disabed for {}",_endp); + _connection.close(WebSocketConnectionD12.CLOSE_BADDATA,"Text frame aggregation disabled"); + } } + // append bytes to message buffer (if they fit) else if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize())) { if (lastFrame) @@ -764,6 +767,11 @@ public class WebSocketConnectionD12 extends AbstractConnection implements WebSoc _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize()); _aggregate.put(buffer); } + else + { + LOG.warn("Frame discarded. Binary aggregation disabed for {}",_endp); + _connection.close(WebSocketConnectionD12.CLOSE_BADDATA,"Binary frame aggregation disabled"); + } } } } diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java index 04b3dcd824f..87bcc610cae 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java @@ -68,6 +68,8 @@ public class WebSocketFactory private final Acceptor _acceptor; private WebSocketBuffers _buffers; private int _maxIdleTime = 300000; + private int _maxTextMessageSize = 16*1024; + private int _maxBinaryMessageSize = -1; public WebSocketFactory(Acceptor acceptor) { @@ -130,6 +132,42 @@ public class WebSocketFactory _buffers = new WebSocketBuffers(bufferSize); } + /** + * @return The initial maximum text message size (in characters) for a connection + */ + public int getMaxTextMessageSize() + { + return _maxTextMessageSize; + } + + /** + * Set the initial maximum text message size for a connection. This can be changed by + * the application calling {@link WebSocket.Connection#setMaxTextMessageSize(int)}. + * @param maxTextMessageSize The default maximum text message size (in characters) for a connection + */ + public void setMaxTextMessageSize(int maxTextMessageSize) + { + _maxTextMessageSize = maxTextMessageSize; + } + + /** + * @return The initial maximum binary message size (in bytes) for a connection + */ + public int getMaxBinaryMessageSize() + { + return _maxBinaryMessageSize; + } + + /** + * Set the initial maximum binary message size for a connection. This can be changed by + * the application calling {@link WebSocket.Connection#setMaxBinaryMessageSize(int)}. + * @param maxTextMessageSize The default maximum binary message size (in bytes) for a connection + */ + public void setMaxBinaryMessageSize(int maxBinaryMessageSize) + { + _maxBinaryMessageSize = maxBinaryMessageSize; + } + /** * Upgrade the request/response to a WebSocket Connection. *

This method will not normally return, but will instead throw a @@ -191,6 +229,10 @@ public class WebSocketFactory throw new HttpException(400, "Unsupported draft specification: " + draft); } + // Set the defaults + connection.getConnection().setMaxBinaryMessageSize(_maxBinaryMessageSize); + connection.getConnection().setMaxTextMessageSize(_maxTextMessageSize); + // Let the connection finish processing the handshake connection.handshake(request, response, protocol); response.flushBuffer(); diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketHandler.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketHandler.java index 870fd702018..eb0eefbc3b1 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketHandler.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketHandler.java @@ -24,70 +24,11 @@ import org.eclipse.jetty.server.handler.HandlerWrapper; public abstract class WebSocketHandler extends HandlerWrapper implements WebSocketFactory.Acceptor { - private WebSocketFactory _webSocketFactory; - private int _bufferSize=64*1024; - private int _maxIdleTime=-1; + private final WebSocketFactory _webSocketFactory=new WebSocketFactory(this,32*1024); - /* ------------------------------------------------------------ */ - /** Get the bufferSize. - * @return the bufferSize - */ - public int getBufferSize() + public WebSocketFactory getWebSocketFactory() { - return _bufferSize; - } - - /* ------------------------------------------------------------ */ - /** Set the bufferSize. - * @param bufferSize the bufferSize to set - */ - public void setBufferSize(int bufferSize) - { - _bufferSize = bufferSize; - } - - /* ------------------------------------------------------------ */ - /** Get the maxIdleTime. - * @return the maxIdleTime - */ - public int getMaxIdleTime() - { - return (int)(_webSocketFactory==null?_maxIdleTime:_webSocketFactory.getMaxIdleTime()); - } - - /* ------------------------------------------------------------ */ - /** Set the maxIdleTime. - * @param maxIdleTime the maxIdleTime to set - */ - public void setMaxIdleTime(int maxIdleTime) - { - _maxIdleTime = maxIdleTime; - if (_webSocketFactory!=null) - _webSocketFactory.setMaxIdleTime(maxIdleTime); - } - - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.server.handler.HandlerWrapper#doStart() - */ - @Override - protected void doStart() throws Exception - { - _webSocketFactory=new WebSocketFactory(this,_bufferSize); - if (_maxIdleTime>=0) - _webSocketFactory.setMaxIdleTime(_maxIdleTime); - super.doStart(); - } - - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.server.handler.HandlerWrapper#doStop() - */ - @Override - protected void doStop() throws Exception - { - super.doStop(); - _webSocketFactory=null; + return _webSocketFactory; } /* ------------------------------------------------------------ */ diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketServlet.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketServlet.java index b3e9ede719c..562c50cd871 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketServlet.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketServlet.java @@ -32,7 +32,13 @@ import javax.servlet.http.HttpServletResponse; * which is also the max frame byte size (default 8192). *

* The initParameter "maxIdleTime" can be used to set the time in ms - * that a websocket may be idle before closing (default 300,000). + * that a websocket may be idle before closing. + *

+ * The initParameter "maxTextMessagesSize" can be used to set the size in characters + * that a websocket may be accept before closing. + *

+ * The initParameter "maxBinaryMessagesSize" can be used to set the size in bytes + * that a websocket may be accept before closing. * */ public abstract class WebSocketServlet extends HttpServlet implements WebSocketFactory.Acceptor @@ -48,9 +54,18 @@ public abstract class WebSocketServlet extends HttpServlet implements WebSocketF { String bs=getInitParameter("bufferSize"); _webSocketFactory = new WebSocketFactory(this,bs==null?8192:Integer.parseInt(bs)); - String mit=getInitParameter("maxIdleTime"); - if (mit!=null) - _webSocketFactory.setMaxIdleTime(Integer.parseInt(mit)); + String max=getInitParameter("maxIdleTime"); + if (max!=null) + _webSocketFactory.setMaxIdleTime(Integer.parseInt(max)); + + max=getInitParameter("maxTextMessageSize"); + if (max!=null) + _webSocketFactory.setMaxTextMessageSize(Integer.parseInt(max)); + + max=getInitParameter("maxBinaryMessageSize"); + if (max!=null) + _webSocketFactory.setMaxBinaryMessageSize(Integer.parseInt(max)); + } /* ------------------------------------------------------------ */ diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD06Test.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD06Test.java index 1521f1a608b..f0e8a4c5ea1 100644 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD06Test.java +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD06Test.java @@ -53,8 +53,8 @@ public class WebSocketMessageD06Test return _serverWebSocket; } }; - wsHandler.setBufferSize(8192); - wsHandler.setMaxIdleTime(1000); + wsHandler.getWebSocketFactory().setBufferSize(8192); + wsHandler.getWebSocketFactory().setMaxIdleTime(1000); wsHandler.setHandler(new DefaultHandler()); _server.setHandler(wsHandler); _server.start(); diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD12Test.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD12Test.java index b687c8b2b41..917eff18d13 100644 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD12Test.java +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD12Test.java @@ -64,8 +64,8 @@ public class WebSocketMessageD12Test return __serverWebSocket; } }; - wsHandler.setBufferSize(8192); - wsHandler.setMaxIdleTime(1000); + wsHandler.getWebSocketFactory().setBufferSize(8192); + wsHandler.getWebSocketFactory().setMaxIdleTime(1000); wsHandler.setHandler(new DefaultHandler()); __server.setHandler(wsHandler); __server.start(); From 0b489b88775e2a1541b2ef6b780f5a77a1f33b7c Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 1 Sep 2011 12:11:10 +1000 Subject: [PATCH 2/2] 356421 Upgraded websocket to draft 13 support --- .../websocket/DeflateFrameExtension.java | 2 +- .../eclipse/jetty/websocket/TestClient.java | 8 +- .../jetty/websocket/WebSocketClient.java | 8 +- .../websocket/WebSocketClientFactory.java | 6 +- .../websocket/WebSocketConnectionD13.java | 884 +++++++++++ .../jetty/websocket/WebSocketFactory.java | 6 +- .../websocket/WebSocketGeneratorD13.java | 234 +++ .../jetty/websocket/WebSocketParserD13.java | 372 +++++ .../jetty/websocket/WebSocketClientTest.java | 16 +- .../websocket/WebSocketGeneratorD13Test.java | 200 +++ .../jetty/websocket/WebSocketLoadD13Test.java | 230 +++ .../websocket/WebSocketMessageD13Test.java | 1307 +++++++++++++++++ .../websocket/WebSocketParserD13Test.java | 359 +++++ 13 files changed, 3611 insertions(+), 21 deletions(-) create mode 100644 jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD13.java create mode 100644 jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD13.java create mode 100644 jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD13.java create mode 100644 jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketGeneratorD13Test.java create mode 100644 jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketLoadD13Test.java create mode 100644 jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD13Test.java create mode 100644 jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD13Test.java diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/DeflateFrameExtension.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/DeflateFrameExtension.java index cd32aafb7c1..e07e9b6da9e 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/DeflateFrameExtension.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/DeflateFrameExtension.java @@ -84,7 +84,7 @@ public class DeflateFrameExtension extends AbstractExtension catch(DataFormatException e) { LOG.warn(e); - getConnection().close(WebSocketConnectionD12.CLOSE_PROTOCOL,e.toString()); + getConnection().close(WebSocketConnectionD13.CLOSE_PROTOCOL,e.toString()); } } diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/TestClient.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/TestClient.java index d218cfec274..80716955347 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/TestClient.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/TestClient.java @@ -138,7 +138,7 @@ public class TestClient implements WebSocket.OnFrame { __framesSent++; byte flags= (byte)(off+len==data.length?0x8:0); - byte op=(byte)(off==0?opcode:WebSocketConnectionD12.OP_CONTINUATION); + byte op=(byte)(off==0?opcode:WebSocketConnectionD13.OP_CONTINUATION); if (_verbose) System.err.printf("%s#addFrame %s|%s %s\n",this.getClass().getSimpleName(),TypeUtil.toHexString(flags),TypeUtil.toHexString(op),TypeUtil.toHexString(data,off,len)); @@ -240,11 +240,11 @@ public class TestClient implements WebSocket.OnFrame { long next = System.currentTimeMillis()+delay; - byte opcode=binary?WebSocketConnectionD12.OP_BINARY:WebSocketConnectionD12.OP_TEXT; + byte opcode=binary?WebSocketConnectionD13.OP_BINARY:WebSocketConnectionD13.OP_TEXT; byte data[]=null; - if (opcode==WebSocketConnectionD12.OP_TEXT) + if (opcode==WebSocketConnectionD13.OP_TEXT) { StringBuilder b = new StringBuilder(); while (b.length() _extensions; + private final WebSocketParserD13 _parser; + private final WebSocketParser.FrameHandler _inbound; + private final WebSocketGeneratorD13 _generator; + private final WebSocketGenerator _outbound; + private final WebSocket _webSocket; + private final OnFrame _onFrame; + private final OnBinaryMessage _onBinaryMessage; + private final OnTextMessage _onTextMessage; + private final OnControl _onControl; + private final String _protocol; + private final int _draft; + private final ClassLoader _context; + private volatile int _closeCode; + private volatile String _closeMessage; + private volatile boolean _closedIn; + private volatile boolean _closedOut; + private int _maxTextMessageSize=-1; + private int _maxBinaryMessageSize=-1; + + static + { + try + { + MAGIC="258EAFA5-E914-47DA-95CA-C5AB0DC85B11".getBytes(StringUtil.__ISO_8859_1); + } + catch (UnsupportedEncodingException e) + { + throw new RuntimeException(e); + } + } + + private final WebSocketParser.FrameHandler _frameHandler= new WSFrameHandler(); + + private final WebSocket.FrameConnection _connection = new WSFrameConnection(); + + + /* ------------------------------------------------------------ */ + public WebSocketConnectionD13(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List extensions,int draft) + throws IOException + { + this(websocket,endpoint,buffers,timestamp,maxIdleTime,protocol,extensions,draft,null); + } + + /* ------------------------------------------------------------ */ + public WebSocketConnectionD13(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, List extensions,int draft, MaskGen maskgen) + throws IOException + { + super(endpoint,timestamp); + + _context=Thread.currentThread().getContextClassLoader(); + + if (endpoint instanceof AsyncEndPoint) + ((AsyncEndPoint)endpoint).cancelIdle(); + + _draft=draft; + _endp.setMaxIdleTime(maxIdleTime); + + _webSocket = websocket; + _onFrame=_webSocket instanceof OnFrame ? (OnFrame)_webSocket : null; + _onTextMessage=_webSocket instanceof OnTextMessage ? (OnTextMessage)_webSocket : null; + _onBinaryMessage=_webSocket instanceof OnBinaryMessage ? (OnBinaryMessage)_webSocket : null; + _onControl=_webSocket instanceof OnControl ? (OnControl)_webSocket : null; + _generator = new WebSocketGeneratorD13(buffers, _endp,maskgen); + + _extensions=extensions; + if (_extensions!=null) + { + int e=0; + for (Extension extension : _extensions) + { + extension.bind( + _connection, + e==extensions.size()-1?_frameHandler:extensions.get(e+1), + e==0?_generator:extensions.get(e-1)); + e++; + } + } + + _outbound=(_extensions==null||_extensions.size()==0)?_generator:extensions.get(extensions.size()-1); + _inbound=(_extensions==null||_extensions.size()==0)?_frameHandler:extensions.get(0); + + _parser = new WebSocketParserD13(buffers, endpoint,_inbound,maskgen==null); + + _protocol=protocol; + + // TODO should these be AsyncEndPoint checks/calls? + if (_endp instanceof SelectChannelEndPoint) + { + final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp; + scep.cancelIdle(); + _idle=new IdleCheck() + { + public void access(EndPoint endp) + { + scep.scheduleIdle(); + } + }; + scep.scheduleIdle(); + } + else + { + _idle = new IdleCheck() + { + public void access(EndPoint endp) + {} + }; + } + } + + /* ------------------------------------------------------------ */ + public WebSocket.Connection getConnection() + { + return _connection; + } + + /* ------------------------------------------------------------ */ + public List getExtensions() + { + if (_extensions==null) + return Collections.emptyList(); + + return _extensions; + } + + /* ------------------------------------------------------------ */ + public Connection handle() throws IOException + { + Thread current = Thread.currentThread(); + ClassLoader oldcontext = current.getContextClassLoader(); + current.setContextClassLoader(_context); + try + { + // handle the framing protocol + boolean progress=true; + + while (progress) + { + int flushed=_generator.flushBuffer(); + int filled=_parser.parseNext(); + + progress = flushed>0 || filled>0; + + if (filled<0 || flushed<0) + { + _endp.close(); + break; + } + } + } + catch(IOException e) + { + try + { + _endp.close(); + } + catch(IOException e2) + { + LOG.ignore(e2); + } + throw e; + } + finally + { + current.setContextClassLoader(oldcontext); + _parser.returnBuffer(); + _generator.returnBuffer(); + if (_endp.isOpen()) + { + _idle.access(_endp); + if (_closedIn && _closedOut && _outbound.isBufferEmpty()) + _endp.close(); + else if (_endp.isInputShutdown() && !_closedIn) + closeIn(CLOSE_NO_CLOSE,null); + else + checkWriteable(); + } + } + return this; + } + + /* ------------------------------------------------------------ */ + public boolean isIdle() + { + return _parser.isBufferEmpty() && _outbound.isBufferEmpty(); + } + + /* ------------------------------------------------------------ */ + @Override + public void idleExpired() + { + long idle = System.currentTimeMillis()-((SelectChannelEndPoint)_endp).getIdleTimestamp(); + closeOut(WebSocketConnectionD13.CLOSE_NORMAL,"Idle for "+idle+"ms > "+_endp.getMaxIdleTime()+"ms"); + } + + /* ------------------------------------------------------------ */ + public boolean isSuspended() + { + return false; + } + + /* ------------------------------------------------------------ */ + public void closed() + { + final boolean closed; + synchronized (this) + { + closed=_closeCode==0; + if (closed) + _closeCode=WebSocketConnectionD13.CLOSE_NO_CLOSE; + } + if (closed) + _webSocket.onClose(WebSocketConnectionD13.CLOSE_NO_CLOSE,"closed"); + } + + /* ------------------------------------------------------------ */ + public void closeIn(int code,String message) + { + LOG.debug("ClosedIn {} {}",this,message); + + final boolean closedOut; + final boolean closed; + synchronized (this) + { + closedOut=_closedOut; + _closedIn=true; + closed=_closeCode==0; + if (closed) + { + _closeCode=code; + _closeMessage=message; + } + } + + try + { + if (closed) + _webSocket.onClose(code,message); + } + finally + { + try + { + if (closedOut) + _endp.close(); + else + closeOut(code,message); + } + catch(IOException e) + { + LOG.ignore(e); + } + } + } + + /* ------------------------------------------------------------ */ + public void closeOut(int code,String message) + { + LOG.debug("ClosedOut {} {}",this,message); + + final boolean close; + final boolean closed; + synchronized (this) + { + close=_closedIn || _closedOut; + _closedOut=true; + closed=_closeCode==0; + if (closed) + { + _closeCode=code; + _closeMessage=message; + } + } + + try + { + if (closed) + _webSocket.onClose(code,message); + } + finally + { + try + { + if (close) + _endp.close(); + else + { + if (code<=0) + code=WebSocketConnectionD13.CLOSE_NORMAL; + byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1); + bytes[0]=(byte)(code/0x100); + bytes[1]=(byte)(code%0x100); + _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD13.OP_CLOSE,bytes,0,bytes.length); + } + _outbound.flush(); + + } + catch(IOException e) + { + LOG.ignore(e); + } + } + } + + /* ------------------------------------------------------------ */ + public void fillBuffersFrom(Buffer buffer) + { + _parser.fill(buffer); + } + + /* ------------------------------------------------------------ */ + private void checkWriteable() + { + if (!_outbound.isBufferEmpty() && _endp instanceof AsyncEndPoint) + { + ((AsyncEndPoint)_endp).scheduleWrite(); + } + } + + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + private class WSFrameConnection implements WebSocket.FrameConnection + { + volatile boolean _disconnecting; + + /* ------------------------------------------------------------ */ + public void sendMessage(String content) throws IOException + { + if (_closedOut) + throw new IOException("closedOut "+_closeCode+":"+_closeMessage); + byte[] data = content.getBytes(StringUtil.__UTF8); + _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD13.OP_TEXT,data,0,data.length); + checkWriteable(); + _idle.access(_endp); + } + + /* ------------------------------------------------------------ */ + public void sendMessage(byte[] content, int offset, int length) throws IOException + { + if (_closedOut) + throw new IOException("closedOut "+_closeCode+":"+_closeMessage); + _outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD13.OP_BINARY,content,offset,length); + checkWriteable(); + _idle.access(_endp); + } + + /* ------------------------------------------------------------ */ + public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException + { + if (_closedOut) + throw new IOException("closedOut "+_closeCode+":"+_closeMessage); + _outbound.addFrame(flags,opcode,content,offset,length); + checkWriteable(); + _idle.access(_endp); + } + + /* ------------------------------------------------------------ */ + public void sendControl(byte ctrl, byte[] data, int offset, int length) throws IOException + { + if (_closedOut) + throw new IOException("closedOut "+_closeCode+":"+_closeMessage); + _outbound.addFrame((byte)FLAG_FIN,ctrl,data,offset,length); + checkWriteable(); + _idle.access(_endp); + } + + /* ------------------------------------------------------------ */ + public boolean isMessageComplete(byte flags) + { + return isLastFrame(flags); + } + + /* ------------------------------------------------------------ */ + public boolean isOpen() + { + return _endp!=null&&_endp.isOpen(); + } + + /* ------------------------------------------------------------ */ + public void close(int code, String message) + { + if (_disconnecting) + return; + _disconnecting=true; + WebSocketConnectionD13.this.closeOut(code,message); + } + + /* ------------------------------------------------------------ */ + public void setMaxIdleTime(int ms) + { + try + { + _endp.setMaxIdleTime(ms); + } + catch(IOException e) + { + LOG.warn(e); + } + } + + /* ------------------------------------------------------------ */ + public void setMaxTextMessageSize(int size) + { + _maxTextMessageSize=size; + } + + /* ------------------------------------------------------------ */ + public void setMaxBinaryMessageSize(int size) + { + _maxBinaryMessageSize=size; + } + + /* ------------------------------------------------------------ */ + public int getMaxIdleTime() + { + return _endp.getMaxIdleTime(); + } + + /* ------------------------------------------------------------ */ + public int getMaxTextMessageSize() + { + return _maxTextMessageSize; + } + + /* ------------------------------------------------------------ */ + public int getMaxBinaryMessageSize() + { + return _maxBinaryMessageSize; + } + + /* ------------------------------------------------------------ */ + public String getProtocol() + { + return _protocol; + } + + /* ------------------------------------------------------------ */ + public byte binaryOpcode() + { + return OP_BINARY; + } + + /* ------------------------------------------------------------ */ + public byte textOpcode() + { + return OP_TEXT; + } + + /* ------------------------------------------------------------ */ + public byte continuationOpcode() + { + return OP_CONTINUATION; + } + + /* ------------------------------------------------------------ */ + public byte finMask() + { + return FLAG_FIN; + } + + /* ------------------------------------------------------------ */ + public boolean isControl(byte opcode) + { + return isControlFrame(opcode); + } + + /* ------------------------------------------------------------ */ + public boolean isText(byte opcode) + { + return opcode==OP_TEXT; + } + + /* ------------------------------------------------------------ */ + public boolean isBinary(byte opcode) + { + return opcode==OP_BINARY; + } + + /* ------------------------------------------------------------ */ + public boolean isContinuation(byte opcode) + { + return opcode==OP_CONTINUATION; + } + + /* ------------------------------------------------------------ */ + public boolean isClose(byte opcode) + { + return opcode==OP_CLOSE; + } + + /* ------------------------------------------------------------ */ + public boolean isPing(byte opcode) + { + return opcode==OP_PING; + } + + /* ------------------------------------------------------------ */ + public boolean isPong(byte opcode) + { + return opcode==OP_PONG; + } + + /* ------------------------------------------------------------ */ + public void disconnect() + { + close(CLOSE_NORMAL,null); + } + + /* ------------------------------------------------------------ */ + public void setFakeFragments(boolean fake) + { + _parser.setFakeFragments(fake); + } + + /* ------------------------------------------------------------ */ + public boolean isFakeFragments() + { + return _parser.isFakeFragments(); + } + + /* ------------------------------------------------------------ */ + @Override + public String toString() + { + return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort(); + } + } + + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + private class WSFrameHandler implements WebSocketParser.FrameHandler + { + private final Utf8StringBuilder _utf8 = new Utf8StringBuilder(); + private ByteArrayBuffer _aggregate; + private byte _opcode=-1; + + public void onFrame(final byte flags, final byte opcode, final Buffer buffer) + { + boolean lastFrame = isLastFrame(flags); + + synchronized(WebSocketConnectionD13.this) + { + // Ignore incoming after a close + if (_closedIn) + return; + } + try + { + byte[] array=buffer.array(); + + // Deliver frame if websocket is a FrameWebSocket + if (_onFrame!=null) + { + if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length())) + return; + } + + if (_onControl!=null && isControlFrame(opcode)) + { + if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length())) + return; + } + + switch(opcode) + { + case WebSocketConnectionD13.OP_CONTINUATION: + { + // If text, append to the message buffer + if (_onTextMessage!=null && _opcode==WebSocketConnectionD13.OP_TEXT) + { + if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize())) + { + // If this is the last fragment, deliver the text buffer + if (lastFrame) + { + _opcode=-1; + String msg =_utf8.toString(); + _utf8.reset(); + _onTextMessage.onMessage(msg); + } + } + else + textMessageTooLarge(); + } + + if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0) + { + if (checkBinaryMessageSize(_aggregate.length(),buffer.length())) + { + _aggregate.put(buffer); + + // If this is the last fragment, deliver + if (lastFrame && _onBinaryMessage!=null) + { + try + { + _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length()); + } + finally + { + _opcode=-1; + _aggregate.clear(); + } + } + } + } + break; + } + case WebSocketConnectionD13.OP_PING: + { + LOG.debug("PING {}",this); + if (!_closedOut) + _connection.sendControl(WebSocketConnectionD13.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length()); + break; + } + + case WebSocketConnectionD13.OP_PONG: + { + LOG.debug("PONG {}",this); + break; + } + + case WebSocketConnectionD13.OP_CLOSE: + { + int code=WebSocketConnectionD13.CLOSE_NO_CODE; + String message=null; + if (buffer.length()>=2) + { + code=buffer.array()[buffer.getIndex()]*0x100+buffer.array()[buffer.getIndex()+1]; + if (buffer.length()>2) + message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8); + } + closeIn(code,message); + break; + } + + case WebSocketConnectionD13.OP_TEXT: + { + if(_onTextMessage!=null) + { + if (_connection.getMaxTextMessageSize()<=0) + { + // No size limit, so handle only final frames + if (lastFrame) + _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8)); + else + { + LOG.warn("Frame discarded. Text aggregation disabed for {}",_endp); + _connection.close(WebSocketConnectionD13.CLOSE_POLICY_VIOLATION,"Text frame aggregation disabled"); + } + } + // append bytes to message buffer (if they fit) + else if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize())) + { + if (lastFrame) + { + String msg =_utf8.toString(); + _utf8.reset(); + _onTextMessage.onMessage(msg); + } + else + { + _opcode=WebSocketConnectionD13.OP_TEXT; + } + } + else + textMessageTooLarge(); + } + break; + } + + default: + { + if (_onBinaryMessage!=null && checkBinaryMessageSize(0,buffer.length())) + { + if (lastFrame) + { + _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length()); + } + else if (_connection.getMaxBinaryMessageSize()>=0) + { + _opcode=opcode; + // TODO use a growing buffer rather than a fixed one. + if (_aggregate==null) + _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize()); + _aggregate.put(buffer); + } + else + { + LOG.warn("Frame discarded. Binary aggregation disabed for {}",_endp); + _connection.close(WebSocketConnectionD13.CLOSE_POLICY_VIOLATION,"Binary frame aggregation disabled"); + } + } + } + } + } + catch(ThreadDeath th) + { + throw th; + } + catch(Throwable th) + { + LOG.warn(th); + } + } + + private boolean checkBinaryMessageSize(int bufferLen, int length) + { + int max = _connection.getMaxBinaryMessageSize(); + if (max>0 && (bufferLen+length)>max) + { + LOG.warn("Binary message too large > {}B for {}",_connection.getMaxBinaryMessageSize(),_endp); + _connection.close(WebSocketConnectionD13.CLOSE_MESSAGE_TOO_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize()); + _opcode=-1; + if (_aggregate!=null) + _aggregate.clear(); + return false; + } + return true; + } + + private void textMessageTooLarge() + { + LOG.warn("Text message too large > {} chars for {}",_connection.getMaxTextMessageSize(),_endp); + _connection.close(WebSocketConnectionD13.CLOSE_MESSAGE_TOO_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars"); + + _opcode=-1; + _utf8.reset(); + } + + public void close(int code,String message) + { + if (code!=CLOSE_NORMAL) + LOG.warn("Close: "+code+" "+message); + _connection.close(code,message); + } + + @Override + public String toString() + { + return WebSocketConnectionD13.this.toString()+"FH"; + } + } + + /* ------------------------------------------------------------ */ + private interface IdleCheck + { + void access(EndPoint endp); + } + + /* ------------------------------------------------------------ */ + public void handshake(HttpServletRequest request, HttpServletResponse response, String subprotocol) throws IOException + { + String uri=request.getRequestURI(); + String query=request.getQueryString(); + if (query!=null && query.length()>0) + uri+="?"+query; + String key = request.getHeader("Sec-WebSocket-Key"); + + response.setHeader("Upgrade","WebSocket"); + response.addHeader("Connection","Upgrade"); + response.addHeader("Sec-WebSocket-Accept",hashKey(key)); + if (subprotocol!=null) + response.addHeader("Sec-WebSocket-Protocol",subprotocol); + + for(Extension ext : _extensions) + response.addHeader("Sec-WebSocket-Extensions",ext.getParameterizedName()); + + response.sendError(101); + + if (_onFrame!=null) + _onFrame.onHandshake(_connection); + _webSocket.onOpen(_connection); + } + + /* ------------------------------------------------------------ */ + public static String hashKey(String key) + { + try + { + MessageDigest md = MessageDigest.getInstance("SHA1"); + md.update(key.getBytes("UTF-8")); + md.update(MAGIC); + return new String(B64Code.encode(md.digest())); + } + catch (Exception e) + { + throw new RuntimeException(e); + } + } + + /* ------------------------------------------------------------ */ + @Override + public String toString() + { + return "WS/D"+_draft+"-"+_endp; + } +} diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java index 87bcc610cae..a2798bf9988 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java @@ -221,9 +221,13 @@ public class WebSocketFactory case 10: case 11: case 12: - extensions= initExtensions(extensions_requested,8-WebSocketConnectionD12.OP_EXT_DATA, 16-WebSocketConnectionD12.OP_EXT_CTRL,3); + extensions= initExtensions(extensions_requested,8-WebSocketConnectionD12.OP_EXT_DATA, 16-WebSocketConnectionD13.OP_EXT_CTRL,3); connection = new WebSocketConnectionD12(websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol,extensions,draft); break; + case 13: + extensions= initExtensions(extensions_requested,8-WebSocketConnectionD13.OP_EXT_DATA, 16-WebSocketConnectionD13.OP_EXT_CTRL,3); + connection = new WebSocketConnectionD13(websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol,extensions,draft); + break; default: LOG.warn("Unsupported Websocket version: "+draft); throw new HttpException(400, "Unsupported draft specification: " + draft); diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD13.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD13.java new file mode 100644 index 00000000000..003d711ca99 --- /dev/null +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD13.java @@ -0,0 +1,234 @@ +// ======================================================================== +// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== + +package org.eclipse.jetty.websocket; + +import java.io.IOException; + +import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.EofException; + + +/* ------------------------------------------------------------ */ +/** WebSocketGenerator. + * This class generates websocket packets. + * It is fully synchronized because it is likely that async + * threads will call the addMessage methods while other + * threads are flushing the generator. + */ +public class WebSocketGeneratorD13 implements WebSocketGenerator +{ + final private WebSocketBuffers _buffers; + final private EndPoint _endp; + private Buffer _buffer; + private final byte[] _mask=new byte[4]; + private int _m; + private boolean _opsent; + private final MaskGen _maskGen; + + public WebSocketGeneratorD13(WebSocketBuffers buffers, EndPoint endp) + { + _buffers=buffers; + _endp=endp; + _maskGen=null; + } + + public WebSocketGeneratorD13(WebSocketBuffers buffers, EndPoint endp, MaskGen maskGen) + { + _buffers=buffers; + _endp=endp; + _maskGen=maskGen; + } + + public synchronized Buffer getBuffer() + { + return _buffer; + } + + public synchronized void addFrame(byte flags, byte opcode, byte[] content, int offset, int length) throws IOException + { + // System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length); + + boolean mask=_maskGen!=null; + + if (_buffer==null) + _buffer=mask?_buffers.getBuffer():_buffers.getDirectBuffer(); + + boolean last=WebSocketConnectionD13.isLastFrame(flags); + byte orig=opcode; + + int space=mask?14:10; + + do + { + opcode = _opsent?WebSocketConnectionD13.OP_CONTINUATION:opcode; + opcode=(byte)(((0xf&flags)<<4)+(0xf&opcode)); + _opsent=true; + + int payload=length; + if (payload+space>_buffer.capacity()) + { + // We must fragement, so clear FIN bit + opcode=(byte)(opcode&0x7F); // Clear the FIN bit + payload=_buffer.capacity()-space; + } + else if (last) + opcode= (byte)(opcode|0x80); // Set the FIN bit + + // ensure there is space for header + if (_buffer.space() <= space) + { + flushBuffer(); + if (_buffer.space() <= space) + flush(); + } + + // write the opcode and length + if (payload>0xffff) + { + _buffer.put(new byte[]{ + opcode, + mask?(byte)0xff:(byte)0x7f, + (byte)((payload>>56)&0x7f), + (byte)((payload>>48)&0xff), + (byte)((payload>>40)&0xff), + (byte)((payload>>32)&0xff), + (byte)((payload>>24)&0xff), + (byte)((payload>>16)&0xff), + (byte)((payload>>8)&0xff), + (byte)(payload&0xff)}); + } + else if (payload >=0x7e) + { + _buffer.put(new byte[]{ + opcode, + mask?(byte)0xfe:(byte)0x7e, + (byte)(payload>>8), + (byte)(payload&0xff)}); + } + else + { + _buffer.put(new byte[]{ + opcode, + (byte)(mask?(0x80|payload):payload)}); + } + + // write mask + if (mask) + { + _maskGen.genMask(_mask); + _m=0; + _buffer.put(_mask); + } + + + // write payload + int remaining = payload; + while (remaining > 0) + { + _buffer.compact(); + int chunk = remaining < _buffer.space() ? remaining : _buffer.space(); + + if (mask) + { + for (int i=0;i 0) + { + // Gently flush the data, issuing a non-blocking write + flushBuffer(); + } + else + { + // Forcibly flush the data, issuing a blocking write + flush(); + if (remaining == 0) + { + // Gently flush the data, issuing a non-blocking write + flushBuffer(); + } + } + } + offset+=payload; + length-=payload; + } + while (length>0); + _opsent=!last; + + if (_buffer!=null && _buffer.length()==0) + { + _buffers.returnBuffer(_buffer); + _buffer=null; + } + } + + public synchronized int flushBuffer() throws IOException + { + if (!_endp.isOpen()) + throw new EofException(); + + if (_buffer!=null) + return _endp.flush(_buffer); + + return 0; + } + + public synchronized int flush() throws IOException + { + if (_buffer==null) + return 0; + int result = flushBuffer(); + + if (!_endp.isBlocking()) + { + long now = System.currentTimeMillis(); + long end=now+_endp.getMaxIdleTime(); + while (_buffer.length()>0) + { + boolean ready = _endp.blockWritable(end-now); + if (!ready) + { + now = System.currentTimeMillis(); + if (now>\n",TypeUtil.toHexString(_flags),TypeUtil.toHexString(_opcode),data.length()); + events++; + _bytesNeeded-=data.length(); + _handler.onFrame((byte)(_flags&(0xff^WebSocketConnectionD13.FLAG_FIN)), _opcode, data); + + _opcode=WebSocketConnectionD13.OP_CONTINUATION; + } + + if (_buffer.space() == 0) + throw new IllegalStateException("FULL: "+_state+" "+_bytesNeeded+">"+_buffer.capacity()); + } + + // catch IOExceptions (probably EOF) and try to parse what we have + try + { + int filled=_endp.isOpen()?_endp.fill(_buffer):-1; + if (filled<=0) + return (total_filled+events)>0?(total_filled+events):filled; + total_filled+=filled; + available=_buffer.length(); + } + catch(IOException e) + { + LOG.debug(e); + return (total_filled+events)>0?(total_filled+events):-1; + } + } + + // if we are here, then we have sufficient bytes to process the current state. + + // Parse the buffer byte by byte (unless it is STATE_DATA) + byte b; + while (_state!=State.DATA && available>=(_state==State.SKIP?1:_bytesNeeded)) + { + switch (_state) + { + case START: + _skip=false; + _state=State.OPCODE; + _bytesNeeded=_state.getNeeds(); + continue; + + case OPCODE: + b=_buffer.get(); + available--; + _opcode=(byte)(b&0xf); + _flags=(byte)(0xf&(b>>4)); + + if (WebSocketConnectionD13.isControlFrame(_opcode)&&!WebSocketConnectionD13.isLastFrame(_flags)) + { + events++; + LOG.warn("Fragmented Control from "+_endp); + _handler.close(WebSocketConnectionD13.CLOSE_PROTOCOL,"Fragmented control"); + _skip=true; + } + + _state=State.LENGTH_7; + _bytesNeeded=_state.getNeeds(); + + continue; + + case LENGTH_7: + b=_buffer.get(); + available--; + _masked=(b&0x80)!=0; + b=(byte)(0x7f&b); + + switch(b) + { + case 0x7f: + _length=0; + _state=State.LENGTH_63; + break; + case 0x7e: + _length=0; + _state=State.LENGTH_16; + break; + default: + _length=(0x7f&b); + _state=_masked?State.MASK:State.PAYLOAD; + } + _bytesNeeded=_state.getNeeds(); + continue; + + case LENGTH_16: + b=_buffer.get(); + available--; + _length = _length*0x100 + (0xff&b); + if (--_bytesNeeded==0) + { + if (_length>_buffer.capacity() && !_fakeFragments) + { + events++; + _handler.close(WebSocketConnectionD13.CLOSE_POLICY_VIOLATION,"frame size "+_length+">"+_buffer.capacity()); + _skip=true; + } + + _state=_masked?State.MASK:State.PAYLOAD; + _bytesNeeded=_state.getNeeds(); + } + continue; + + case LENGTH_63: + b=_buffer.get(); + available--; + _length = _length*0x100 + (0xff&b); + if (--_bytesNeeded==0) + { + _bytesNeeded=(int)_length; + if (_length>=_buffer.capacity()) + { + events++; + _handler.close(WebSocketConnectionD13.CLOSE_POLICY_VIOLATION,"frame size "+_length+">"+_buffer.capacity()); + _skip=true; + } + + _state=_masked?State.MASK:State.PAYLOAD; + _bytesNeeded=_state.getNeeds(); + } + continue; + + case MASK: + _buffer.get(_mask,0,4); + _m=0; + available-=4; + _state=State.PAYLOAD; + _bytesNeeded=_state.getNeeds(); + break; + + case PAYLOAD: + _bytesNeeded=(int)_length; + _state=_skip?State.SKIP:State.DATA; + break; + + case DATA: + break; + + case SKIP: + int skip=Math.min(available,_bytesNeeded); + _buffer.skip(skip); + available-=skip; + _bytesNeeded-=skip; + if (_bytesNeeded==0) + _state=State.START; + + } + } + + if (_state==State.DATA && available>=_bytesNeeded) + { + if ( _masked!=_shouldBeMasked) + { + _buffer.skip(_bytesNeeded); + _state=State.START; + events++; + _handler.close(WebSocketConnectionD13.CLOSE_PROTOCOL,"bad mask"); + } + else + { + Buffer data =_buffer.get(_bytesNeeded); + if (_masked) + { + if (data.array()==null) + data=_buffer.asMutableBuffer(); + byte[] array = data.array(); + final int end=data.putIndex(); + for (int i=data.getIndex();i>\n",TypeUtil.toHexString(_flags),TypeUtil.toHexString(_opcode),data.length()); + events++; + _handler.onFrame(_flags, _opcode, data); + _bytesNeeded=0; + _state=State.START; + } + + return total_filled+events; + } + } + } + + /* ------------------------------------------------------------ */ + public void fill(Buffer buffer) + { + if (buffer!=null && buffer.length()>0) + { + if (_buffer==null) + _buffer=_buffers.getBuffer(); + + _buffer.put(buffer); + buffer.clear(); + } + } + + /* ------------------------------------------------------------ */ + public void returnBuffer() + { + if (_buffer!=null && _buffer.length()==0) + { + _buffers.returnBuffer(_buffer); + _buffer=null; + } + } + + /* ------------------------------------------------------------ */ + @Override + public String toString() + { + Buffer buffer=_buffer; + return WebSocketParserD13.class.getSimpleName()+"@"+ Integer.toHexString(hashCode())+"|"+_state+"|"+(buffer==null?"<>":buffer.toDetailString()); + } + +} diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketClientTest.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketClientTest.java index d6c1ca8de24..4268e02f858 100644 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketClientTest.java +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketClientTest.java @@ -167,7 +167,7 @@ public class WebSocketClientTest } Assert.assertFalse(open.get()); - Assert.assertEquals(WebSocketConnectionD12.CLOSE_NOCLOSE,close.get()); + Assert.assertEquals(WebSocketConnectionD13.CLOSE_NO_CLOSE,close.get()); Assert.assertTrue(error instanceof ConnectException); } @@ -207,7 +207,7 @@ public class WebSocketClientTest } Assert.assertFalse(open.get()); - Assert.assertEquals(WebSocketConnectionD12.CLOSE_NOCLOSE,close.get()); + Assert.assertEquals(WebSocketConnectionD13.CLOSE_NO_CLOSE,close.get()); Assert.assertTrue(error instanceof TimeoutException); } @@ -246,7 +246,7 @@ public class WebSocketClientTest } Assert.assertFalse(open.get()); - Assert.assertEquals(WebSocketConnectionD12.CLOSE_NOCLOSE,close.get()); + Assert.assertEquals(WebSocketConnectionD13.CLOSE_NO_CLOSE,close.get()); Assert.assertTrue(error instanceof TimeoutException); } @@ -287,7 +287,7 @@ public class WebSocketClientTest } Assert.assertFalse(open.get()); - Assert.assertEquals(WebSocketConnectionD12.CLOSE_PROTOCOL,close.get()); + Assert.assertEquals(WebSocketConnectionD13.CLOSE_PROTOCOL,close.get()); Assert.assertTrue(error instanceof IOException); Assert.assertTrue(error.getMessage().indexOf("404 NOT FOUND")>0); @@ -330,7 +330,7 @@ public class WebSocketClientTest error=e.getCause(); } Assert.assertFalse(open.get()); - Assert.assertEquals(WebSocketConnectionD12.CLOSE_PROTOCOL,close.get()); + Assert.assertEquals(WebSocketConnectionD13.CLOSE_PROTOCOL,close.get()); Assert.assertTrue(error instanceof IOException); Assert.assertTrue(error.getMessage().indexOf("Bad Sec-WebSocket-Accept")>=0); } @@ -368,7 +368,7 @@ public class WebSocketClientTest socket.close(); _latch.await(10,TimeUnit.SECONDS); - Assert.assertEquals(WebSocketConnectionD12.CLOSE_NOCLOSE,close.get()); + Assert.assertEquals(WebSocketConnectionD13.CLOSE_NO_CLOSE,close.get()); } @@ -406,7 +406,7 @@ public class WebSocketClientTest long start=System.currentTimeMillis(); _latch.await(10,TimeUnit.SECONDS); Assert.assertTrue(System.currentTimeMillis()-start<5000); - Assert.assertEquals(WebSocketConnectionD12.CLOSE_NORMAL,close.get()); + Assert.assertEquals(WebSocketConnectionD13.CLOSE_NORMAL,close.get()); } @@ -742,7 +742,7 @@ public class WebSocketClientTest } connection.getOutputStream().write(( "HTTP/1.1 101 Upgrade\r\n" + - "Sec-WebSocket-Accept: "+ WebSocketConnectionD12.hashKey(key) +"\r\n" + + "Sec-WebSocket-Accept: "+ WebSocketConnectionD13.hashKey(key) +"\r\n" + "\r\n").getBytes()); } } diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketGeneratorD13Test.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketGeneratorD13Test.java new file mode 100644 index 00000000000..bc0585447e7 --- /dev/null +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketGeneratorD13Test.java @@ -0,0 +1,200 @@ +package org.eclipse.jetty.websocket; + +import static junit.framework.Assert.assertEquals; + +import org.eclipse.jetty.io.ByteArrayBuffer; +import org.eclipse.jetty.io.ByteArrayEndPoint; +import org.eclipse.jetty.util.StringUtil; +import org.junit.Before; +import org.junit.Test; + +/** + * @version $Revision$ $Date$ + */ +public class WebSocketGeneratorD13Test +{ + private ByteArrayBuffer _out; + private WebSocketGenerator _generator; + ByteArrayEndPoint _endPoint; + WebSocketBuffers _buffers; + byte[] _mask = new byte[4]; + int _m; + + public MaskGen _maskGen = new FixedMaskGen( + new byte[]{(byte)0x00,(byte)0x00,(byte)0x0f,(byte)0xff}); + + @Before + public void setUp() throws Exception + { + _endPoint = new ByteArrayEndPoint(); + _out = new ByteArrayBuffer(2048); + _endPoint.setOut(_out); + _buffers = new WebSocketBuffers(1024); + _m=0; + } + + byte getMasked() + { + return (byte)(_out.get()^_mask[_m++%4]); + } + + + @Test + public void testOneString() throws Exception + { + _generator = new WebSocketGeneratorD13(_buffers, _endPoint,null); + + byte[] data = "Hell\uFF4F W\uFF4Frld".getBytes(StringUtil.__UTF8); + _generator.addFrame((byte)0x8,(byte)0x04,data,0,data.length); + _generator.flush(); + assertEquals((byte)0x84,_out.get()); + assertEquals(15,0xff&_out.get()); + assertEquals('H',_out.get()); + assertEquals('e',_out.get()); + assertEquals('l',_out.get()); + assertEquals('l',_out.get()); + assertEquals(0xEF,0xff&_out.get()); + assertEquals(0xBD,0xff&_out.get()); + assertEquals(0x8F,0xff&_out.get()); + assertEquals(' ',_out.get()); + assertEquals('W',_out.get()); + assertEquals(0xEF,0xff&_out.get()); + assertEquals(0xBD,0xff&_out.get()); + assertEquals(0x8F,0xff&_out.get()); + assertEquals('r',_out.get()); + assertEquals('l',_out.get()); + assertEquals('d',_out.get()); + } + + @Test + public void testOneBuffer() throws Exception + { + _generator = new WebSocketGeneratorD13(_buffers, _endPoint,null); + + String string = "Hell\uFF4F W\uFF4Frld"; + byte[] bytes=string.getBytes(StringUtil.__UTF8); + _generator.addFrame((byte)0x8,(byte)0x04,bytes,0,bytes.length); + _generator.flush(); + assertEquals((byte)0x84,_out.get()); + assertEquals(15,0xff&_out.get()); + assertEquals('H',_out.get()); + assertEquals('e',_out.get()); + assertEquals('l',_out.get()); + assertEquals('l',_out.get()); + assertEquals(0xEF,0xff&_out.get()); + assertEquals(0xBD,0xff&_out.get()); + assertEquals(0x8F,0xff&_out.get()); + assertEquals(' ',_out.get()); + assertEquals('W',_out.get()); + assertEquals(0xEF,0xff&_out.get()); + assertEquals(0xBD,0xff&_out.get()); + assertEquals(0x8F,0xff&_out.get()); + assertEquals('r',_out.get()); + assertEquals('l',_out.get()); + assertEquals('d',_out.get()); + } + + @Test + public void testOneLongBuffer() throws Exception + { + _generator = new WebSocketGeneratorD13(_buffers, _endPoint,null); + + byte[] b=new byte[150]; + for (int i=0;i> "+data); + outbound.sendMessage(data); + } + catch (IOException x) + { + outbound.disconnect(); + } + } + + public void onClose(int closeCode, String message) + { + } + } + + private class WebSocketClient implements Runnable + { + private final Socket socket; + private final BufferedWriter output; + private final BufferedReader input; + private final int iterations; + private final CountDownLatch latch; + private final SocketEndPoint _endp; + private final WebSocketGeneratorD13 _generator; + private final WebSocketParserD13 _parser; + private final WebSocketParser.FrameHandler _handler = new WebSocketParser.FrameHandler() + { + public void onFrame(byte flags, byte opcode, Buffer buffer) + { + _response=buffer; + } + + public void close(int code,String message) + { + } + }; + private volatile Buffer _response; + + public WebSocketClient(String host, int port, int readTimeout, CountDownLatch latch, int iterations) throws IOException + { + this.latch = latch; + socket = new Socket(host, port); + socket.setSoTimeout(readTimeout); + output = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream(), "ISO-8859-1")); + input = new BufferedReader(new InputStreamReader(socket.getInputStream(), "ISO-8859-1")); + this.iterations = iterations; + + _endp=new SocketEndPoint(socket); + _generator = new WebSocketGeneratorD13(new WebSocketBuffers(32*1024),_endp,new FixedMaskGen()); + _parser = new WebSocketParserD13(new WebSocketBuffers(32*1024),_endp,_handler,false); + + } + + private void open() throws IOException + { + output.write("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: onConnect\r\n" + + "Sec-WebSocket-Version: 7\r\n"+ + "\r\n"); + output.flush(); + + String responseLine = input.readLine(); + assertTrue(responseLine.startsWith("HTTP/1.1 101 Switching Protocols")); + // Read until we find an empty line, which signals the end of the http response + String line; + while ((line = input.readLine()) != null) + if (line.length() == 0) + break; + } + + public void run() + { + try + { + String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF"; + for (int i = 0; i < iterations; ++i) + { + byte[] data = message.getBytes(StringUtil.__UTF8); + _generator.addFrame((byte)0x8,WebSocketConnectionD13.OP_TEXT,data,0,data.length); + _generator.flush(); + + //System.err.println("-> "+message); + + _response=null; + while(_response==null) + _parser.parseNext(); + //System.err.println("<- "+_response); + Assert.assertEquals(message,_response.toString()); + latch.countDown(); + } + } + catch (IOException x) + { + throw new RuntimeException(x); + } + } + + + public void close() throws IOException + { + socket.close(); + } + } +} diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD13Test.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD13Test.java new file mode 100644 index 00000000000..34d728c44ae --- /dev/null +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD13Test.java @@ -0,0 +1,1307 @@ +package org.eclipse.jetty.websocket; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.net.Socket; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; +import java.util.zip.Deflater; +import java.util.zip.Inflater; + +import javax.servlet.http.HttpServletRequest; + +import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.io.ByteArrayEndPoint; +import org.eclipse.jetty.server.Connector; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.server.nio.SelectChannelConnector; +import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.TypeUtil; +import org.eclipse.jetty.util.Utf8StringBuilder; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * @version $Revision$ $Date$ + */ +public class WebSocketMessageD13Test +{ + private static Server __server; + private static Connector __connector; + private static TestWebSocket __serverWebSocket; + private static CountDownLatch __latch; + private static AtomicInteger __textCount = new AtomicInteger(0); + + @BeforeClass + public static void startServer() throws Exception + { + __server = new Server(); + __connector = new SelectChannelConnector(); + __server.addConnector(__connector); + WebSocketHandler wsHandler = new WebSocketHandler() + { + public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) + { + __textCount.set(0); + __serverWebSocket = new TestWebSocket(); + __serverWebSocket._onConnect=("onConnect".equals(protocol)); + __serverWebSocket._echo=("echo".equals(protocol)); + __serverWebSocket._aggregate=("aggregate".equals(protocol)); + __serverWebSocket._latch=("latch".equals(protocol)); + if (__serverWebSocket._latch) + __latch=new CountDownLatch(1); + return __serverWebSocket; + } + }; + wsHandler.getWebSocketFactory().setBufferSize(8192); + wsHandler.getWebSocketFactory().setMaxIdleTime(1000); + wsHandler.setHandler(new DefaultHandler()); + __server.setHandler(wsHandler); + __server.start(); + } + + @AfterClass + public static void stopServer() throws Exception + { + __server.stop(); + __server.join(); + } + + + @Test + public void testHash() + { + assertEquals("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",WebSocketConnectionD13.hashKey("dGhlIHNhbXBsZSBub25jZQ==")); + } + + @Test + public void testServerSendBigStringMessage() throws Exception + { + Socket socket = new Socket("localhost", __connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: chat, superchat\r\n"+ + "Sec-WebSocket-Version: "+WebSocketConnectionD13.VERSION+"\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + + // Make sure the read times out if there are problems with the implementation + socket.setSoTimeout(1000); + + InputStream input = socket.getInputStream(); + + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("\r\n\r\n",input); + + assertTrue(__serverWebSocket.awaitConnected(1000)); + assertNotNull(__serverWebSocket.connection); + + // Server sends a big message + StringBuilder message = new StringBuilder(); + String text = "0123456789ABCDEF"; + for (int i = 0; i < (0x2000) / text.length(); i++) + message.append(text); + String data=message.toString(); + __serverWebSocket.connection.sendMessage(data); + + assertEquals(WebSocketConnectionD13.OP_TEXT,input.read()); + assertEquals(0x7e,input.read()); + assertEquals(0x1f,input.read()); + assertEquals(0xf6,input.read()); + lookFor(data.substring(0,0x1ff6),input); + assertEquals(0x80,input.read()); + assertEquals(0x0A,input.read()); + lookFor(data.substring(0x1ff6),input); + } + + @Test + public void testServerSendOnConnect() throws Exception + { + Socket socket = new Socket("localhost", __connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: onConnect\r\n" + + "Sec-WebSocket-Version: "+WebSocketConnectionD13.VERSION+"\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + + // Make sure the read times out if there are problems with the implementation + socket.setSoTimeout(1000); + + InputStream input = socket.getInputStream(); + + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("\r\n\r\n",input); + + assertTrue(__serverWebSocket.awaitConnected(1000)); + assertNotNull(__serverWebSocket.connection); + + assertEquals(0x81,input.read()); + assertEquals(0x0f,input.read()); + lookFor("sent on connect",input); + } + + @Test + public void testIdentityExtension() throws Exception + { + Socket socket = new Socket("localhost", __connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: onConnect\r\n" + + "Sec-WebSocket-Version: "+WebSocketConnectionD13.VERSION+"\r\n"+ + "Sec-WebSocket-Extensions: identity;param=0\r\n"+ + "Sec-WebSocket-Extensions: identity;param=1, identity ; param = '2' ; other = ' some = value ' \r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + + // Make sure the read times out if there are problems with the implementation + socket.setSoTimeout(1000); + + InputStream input = socket.getInputStream(); + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("Sec-WebSocket-Extensions: ",input); + lookFor("identity;param=0",input); + skipTo("Sec-WebSocket-Extensions: ",input); + lookFor("identity;param=1",input); + skipTo("Sec-WebSocket-Extensions: ",input); + lookFor("identity;",input); + skipTo("\r\n\r\n",input); + + assertTrue(__serverWebSocket.awaitConnected(1000)); + assertNotNull(__serverWebSocket.connection); + + assertEquals(0x81,input.read()); + assertEquals(0x0f,input.read()); + lookFor("sent on connect",input); + } + + + @Test + public void testFragmentExtension() throws Exception + { + Socket socket = new Socket("localhost", __connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: onConnect\r\n" + + "Sec-WebSocket-Version: "+WebSocketConnectionD13.VERSION+"\r\n"+ + "Sec-WebSocket-Extensions: fragment;maxLength=4;minFragments=7\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + + // Make sure the read times out if there are problems with the implementation + socket.setSoTimeout(1000); + + InputStream input = socket.getInputStream(); + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("Sec-WebSocket-Extensions: ",input); + lookFor("fragment;",input); + skipTo("\r\n\r\n",input); + + assertTrue(__serverWebSocket.awaitConnected(1000)); + assertNotNull(__serverWebSocket.connection); + + assertEquals(0x01,input.read()); + assertEquals(0x04,input.read()); + lookFor("sent",input); + assertEquals(0x00,input.read()); + assertEquals(0x04,input.read()); + lookFor(" on ",input); + assertEquals(0x00,input.read()); + assertEquals(0x04,input.read()); + lookFor("conn",input); + assertEquals(0x00,input.read()); + assertEquals(0x01,input.read()); + lookFor("e",input); + assertEquals(0x00,input.read()); + assertEquals(0x01,input.read()); + lookFor("c",input); + assertEquals(0x00,input.read()); + assertEquals(0x00,input.read()); + assertEquals(0x80,input.read()); + assertEquals(0x01,input.read()); + lookFor("t",input); + } + + @Test + public void testDeflateFrameExtension() throws Exception + { + Socket socket = new Socket("localhost", __connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: echo\r\n" + + "Sec-WebSocket-Version: "+WebSocketConnectionD13.VERSION+"\r\n"+ + "Sec-WebSocket-Extensions: x-deflate-frame;minLength=64\r\n"+ + "Sec-WebSocket-Extensions: fragment;minFragments=2\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + + // Make sure the read times out if there are problems with the implementation + socket.setSoTimeout(1000); + + InputStream input = socket.getInputStream(); + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("Sec-WebSocket-Extensions: ",input); + lookFor("x-deflate-frame;minLength=64",input); + skipTo("Sec-WebSocket-Extensions: ",input); + lookFor("fragment;",input); + skipTo("\r\n\r\n",input); + + assertTrue(__serverWebSocket.awaitConnected(1000)); + assertNotNull(__serverWebSocket.connection); + + + // Server sends a big message + String text = "0123456789ABCDEF "; + text=text+text+text+text; + text=text+text+text+text; + text=text+text+text+text+'X'; + byte[] data=text.getBytes("utf-8"); + Deflater deflater = new Deflater(); + deflater.setInput(data); + deflater.finish(); + byte[] buf=new byte[data.length]; + + buf[0]=(byte)((byte)0x7e); + buf[1]=(byte)(data.length>>8); + buf[2]=(byte)(data.length&0xff); + + int l=deflater.deflate(buf,3,buf.length-3); + + assertTrue(deflater.finished()); + + output.write(0xC1); + output.write((byte)(0x80|(0xff&(l+3)))); + output.write(0x00); + output.write(0x00); + output.write(0x00); + output.write(0x00); + output.write(buf,0,l+3); + output.flush(); + + assertEquals(0x40+WebSocketConnectionD13.OP_TEXT,input.read()); + assertEquals(0x20+3,input.read()); + assertEquals(0x7e,input.read()); + assertEquals(0x02,input.read()); + assertEquals(0x20,input.read()); + + byte[] raw = new byte[32]; + assertEquals(32,input.read(raw)); + + Inflater inflater = new Inflater(); + inflater.setInput(raw); + + byte[] result = new byte[544]; + assertEquals(544,inflater.inflate(result)); + assertEquals(TypeUtil.toHexString(data,0,544),TypeUtil.toHexString(result)); + + + assertEquals((byte)0xC0,(byte)input.read()); + assertEquals(0x21+3,input.read()); + assertEquals(0x7e,input.read()); + assertEquals(0x02,input.read()); + assertEquals(0x21,input.read()); + + assertEquals(32,input.read(raw)); + + inflater.reset(); + inflater.setInput(raw); + result = new byte[545]; + assertEquals(545,inflater.inflate(result)); + assertEquals(TypeUtil.toHexString(data,544,545),TypeUtil.toHexString(result)); + + + } + + @Test + public void testServerEcho() throws Exception + { + Socket socket = new Socket("localhost", __connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: echo\r\n" + + "Sec-WebSocket-Version: "+WebSocketConnectionD13.VERSION+"\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + output.write(0x84); + output.write(0x8f); + output.write(0xff); + output.write(0xff); + output.write(0xff); + output.write(0xff); + byte[] bytes="this is an echo".getBytes(StringUtil.__ISO_8859_1); + for (int i=0;i>> "+i); + output.flush(); + + long now=System.currentTimeMillis(); + long duration=now-start; + start=now; + if (max2000); // was blocked + } + + @Test + public void testBlockedProducer() throws Exception + { + final Socket socket = new Socket("localhost", __connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + + final int count = 100000; + + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: latch\r\n" + + "Sec-WebSocket-Version: "+WebSocketConnectionD13.VERSION+"\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + + // Make sure the read times out if there are problems with the implementation + socket.setSoTimeout(60000); + + InputStream input = socket.getInputStream(); + + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("\r\n\r\n",input); + + assertTrue(__serverWebSocket.awaitConnected(1000)); + assertNotNull(__serverWebSocket.connection); + __serverWebSocket.connection.setMaxIdleTime(60000); + __latch.countDown(); + + // wait 2s and then consume messages + final AtomicLong totalB=new AtomicLong(); + new Thread() + { + public void run() + { + try + { + Thread.sleep(2000); + + byte[] recv = new byte[32*1024]; + + int len=0; + while (len>=0) + { + totalB.addAndGet(len); + len=socket.getInputStream().read(recv,0,recv.length); + Thread.sleep(10); + } + } + catch(Exception e) + { + e.printStackTrace(); + } + } + }.start(); + + + // Send enough messages to fill receive buffer + long max=0; + long start=System.currentTimeMillis(); + String mesg="How Now Brown Cow"; + for (int i=0;i1000); // was blocked + } + + @Test + public void testServerPingPong() throws Exception + { + Socket socket = new Socket("localhost", __connector.getLocalPort()); + // Make sure the read times out if there are problems with the implementation + OutputStream output = socket.getOutputStream(); + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: echo\r\n" + + "Sec-WebSocket-Version: "+WebSocketConnectionD13.VERSION+"\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + output.write(0x89); + output.write(0x80); + output.write(0xff); + output.write(0xff); + output.write(0xff); + output.write(0xff); + output.flush(); + + InputStream input = socket.getInputStream(); + + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("\r\n\r\n",input); + + assertTrue(__serverWebSocket.awaitConnected(1000)); + assertNotNull(__serverWebSocket.connection); + + socket.setSoTimeout(1000); + assertEquals(0x8A,input.read()); + assertEquals(0x00,input.read()); + } + + @Test + public void testMaxTextSizeFalseFrag() throws Exception + { + Socket socket = new Socket("localhost", __connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: other\r\n" + + "Sec-WebSocket-Version: "+WebSocketConnectionD13.VERSION+"\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + + socket.setSoTimeout(1000); + InputStream input = socket.getInputStream(); + + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("\r\n\r\n",input); + + assertTrue(__serverWebSocket.awaitConnected(1000)); + assertNotNull(__serverWebSocket.connection); + + __serverWebSocket.getConnection().setMaxTextMessageSize(10*1024); + __serverWebSocket.getConnection().setFakeFragments(true); + + output.write(0x81); + output.write(0x80|0x7E); + output.write((byte)((16*1024)>>8)); + output.write((byte)((16*1024)&0xff)); + output.write(0x00); + output.write(0x00); + output.write(0x00); + output.write(0x00); + + for (int i=0;i<(16*1024);i++) + output.write('X'); + output.flush(); + + + assertEquals(0x80|WebSocketConnectionD13.OP_CLOSE,input.read()); + assertEquals(33,input.read()); + int code=(0xff&input.read())*0x100+(0xff&input.read()); + assertEquals(WebSocketConnectionD13.CLOSE_MESSAGE_TOO_LARGE,code); + lookFor("Text message size > 10240 chars",input); + } + + @Test + public void testMaxTextSize() throws Exception + { + Socket socket = new Socket("localhost", __connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: other\r\n" + + "Sec-WebSocket-Version: "+WebSocketConnectionD13.VERSION+"\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + + socket.setSoTimeout(1000); + InputStream input = socket.getInputStream(); + + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("\r\n\r\n",input); + + assertTrue(__serverWebSocket.awaitConnected(1000)); + assertNotNull(__serverWebSocket.connection); + + __serverWebSocket.getConnection().setMaxTextMessageSize(15); + + output.write(0x01); + output.write(0x8a); + output.write(0xff); + output.write(0xff); + output.write(0xff); + output.write(0xff); + byte[] bytes="0123456789".getBytes(StringUtil.__ISO_8859_1); + for (int i=0;i 15 chars",input); + } + + + @Test + public void testMaxTextSize2() throws Exception + { + Socket socket = new Socket("localhost", __connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: other\r\n" + + "Sec-WebSocket-Version: "+WebSocketConnectionD13.VERSION+"\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + + socket.setSoTimeout(100000); + InputStream input = socket.getInputStream(); + + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("\r\n\r\n",input); + + assertTrue(__serverWebSocket.awaitConnected(1000)); + assertNotNull(__serverWebSocket.connection); + + __serverWebSocket.getConnection().setMaxTextMessageSize(15); + + output.write(0x01); + output.write(0x94); + output.write(0xff); + output.write(0xff); + output.write(0xff); + output.write(0xff); + byte[] bytes="01234567890123456789".getBytes(StringUtil.__ISO_8859_1); + for (int i=0;i 15 chars",input); + } + + @Test + public void testBinaryAggregate() throws Exception + { + Socket socket = new Socket("localhost", __connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: aggregate\r\n" + + "Sec-WebSocket-Version: "+WebSocketConnectionD13.VERSION+"\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + + socket.setSoTimeout(1000); + InputStream input = socket.getInputStream(); + + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("\r\n\r\n",input); + + assertTrue(__serverWebSocket.awaitConnected(1000)); + assertNotNull(__serverWebSocket.connection); + __serverWebSocket.getConnection().setMaxBinaryMessageSize(1024); + + output.write(WebSocketConnectionD13.OP_BINARY); + output.write(0x8a); + output.write(0xff); + output.write(0xff); + output.write(0xff); + output.write(0xff); + byte[] bytes="0123456789".getBytes(StringUtil.__ISO_8859_1); + for (int i=0;i 15",input); + } + + + @Test + public void testMaxBinarySize2() throws Exception + { + Socket socket = new Socket("localhost", __connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: other\r\n" + + "Sec-WebSocket-Version: "+WebSocketConnectionD13.VERSION+"\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + + socket.setSoTimeout(100000); + InputStream input = socket.getInputStream(); + + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("\r\n\r\n",input); + + assertTrue(__serverWebSocket.awaitConnected(1000)); + assertNotNull(__serverWebSocket.connection); + + __serverWebSocket.getConnection().setMaxBinaryMessageSize(15); + + output.write(0x02); + output.write(0x94); + output.write(0xff); + output.write(0xff); + output.write(0xff); + output.write(0xff); + byte[] bytes="01234567890123456789".getBytes(StringUtil.__ISO_8859_1); + for (int i=0;i 15",input); + } + + @Test + public void testIdle() throws Exception + { + Socket socket = new Socket("localhost", __connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: onConnect\r\n" + + "Sec-WebSocket-Version: "+WebSocketConnectionD13.VERSION+"\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + + // Make sure the read times out if there are problems with the implementation + socket.setSoTimeout(10000); + + InputStream input = socket.getInputStream(); + + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("\r\n\r\n",input); + + assertTrue(__serverWebSocket.awaitConnected(1000)); + assertNotNull(__serverWebSocket.connection); + + assertEquals(0x81,input.read()); + assertEquals(0x0f,input.read()); + lookFor("sent on connect",input); + + assertEquals((byte)0x88,(byte)input.read()); + assertEquals(26,input.read()); + assertEquals(1000/0x100,input.read()); + assertEquals(1000%0x100,input.read()); + lookFor("Idle",input); + + // respond to close + output.write(0x88^0xff); + output.write(0x80^0xff); + output.write(0xff); + output.write(0xff); + output.write(0xff); + output.write(0xff); + output.flush(); + + + assertTrue(__serverWebSocket.awaitDisconnected(5000)); + try + { + __serverWebSocket.connection.sendMessage("Don't send"); + assertTrue(false); + } + catch(IOException e) + { + assertTrue(true); + } + } + + @Test + public void testClose() throws Exception + { + Socket socket = new Socket("localhost", __connector.getLocalPort()); + OutputStream output = socket.getOutputStream(); + output.write( + ("GET /chat HTTP/1.1\r\n"+ + "Host: server.example.com\r\n"+ + "Upgrade: websocket\r\n"+ + "Connection: Upgrade\r\n"+ + "Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==\r\n"+ + "Sec-WebSocket-Origin: http://example.com\r\n"+ + "Sec-WebSocket-Protocol: onConnect\r\n" + + "Sec-WebSocket-Version: "+WebSocketConnectionD13.VERSION+"\r\n"+ + "\r\n").getBytes("ISO-8859-1")); + output.flush(); + + // Make sure the read times out if there are problems with the implementation + socket.setSoTimeout(1000); + + InputStream input = socket.getInputStream(); + + lookFor("HTTP/1.1 101 Switching Protocols\r\n",input); + skipTo("Sec-WebSocket-Accept: ",input); + lookFor("s3pPLMBiTxaQ9kYGzzhZRbK+xOo=",input); + skipTo("\r\n\r\n",input); + + + assertTrue(__serverWebSocket.awaitConnected(1000)); + assertNotNull(__serverWebSocket.connection); + + assertEquals(0x81,input.read()); + assertEquals(0x0f,input.read()); + lookFor("sent on connect",input); + socket.close(); + + assertTrue(__serverWebSocket.awaitDisconnected(500)); + + + try + { + __serverWebSocket.connection.sendMessage("Don't send"); + assertTrue(false); + } + catch(IOException e) + { + assertTrue(true); + } + } + + @Test + public void testParserAndGenerator() throws Exception + { + String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF"; + final AtomicReference received = new AtomicReference(); + ByteArrayEndPoint endp = new ByteArrayEndPoint(new byte[0],4096); + + WebSocketGeneratorD13 gen = new WebSocketGeneratorD13(new WebSocketBuffers(8096),endp,null); + + byte[] data = message.getBytes(StringUtil.__UTF8); + gen.addFrame((byte)0x8,(byte)0x4,data,0,data.length); + + endp = new ByteArrayEndPoint(endp.getOut().asArray(),4096); + + WebSocketParserD13 parser = new WebSocketParserD13(new WebSocketBuffers(8096),endp,new WebSocketParser.FrameHandler() + { + public void onFrame(byte flags, byte opcode, Buffer buffer) + { + received.set(buffer.toString()); + } + + public void close(int code,String message) + { + } + + },false); + + parser.parseNext(); + + assertEquals(message,received.get()); + } + + @Test + public void testParserAndGeneratorMasked() throws Exception + { + String message = "0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF0123456789ABCDEF"; + final AtomicReference received = new AtomicReference(); + ByteArrayEndPoint endp = new ByteArrayEndPoint(new byte[0],4096); + + MaskGen maskGen = new RandomMaskGen(); + + WebSocketGeneratorD13 gen = new WebSocketGeneratorD13(new WebSocketBuffers(8096),endp,maskGen); + byte[] data = message.getBytes(StringUtil.__UTF8); + gen.addFrame((byte)0x8,(byte)0x1,data,0,data.length); + + endp = new ByteArrayEndPoint(endp.getOut().asArray(),4096); + + WebSocketParserD13 parser = new WebSocketParserD13(new WebSocketBuffers(8096),endp,new WebSocketParser.FrameHandler() + { + public void onFrame(byte flags, byte opcode, Buffer buffer) + { + received.set(buffer.toString()); + } + + public void close(int code,String message) + { + } + },true); + + parser.parseNext(); + + assertEquals(message,received.get()); + } + + + private void lookFor(String string,InputStream in) + throws IOException + { + String orig=string; + Utf8StringBuilder scanned=new Utf8StringBuilder(); + try + { + while(true) + { + int b = in.read(); + if (b<0) + throw new EOFException(); + scanned.append((byte)b); + assertEquals("looking for\""+orig+"\" in '"+scanned+"'",(int)string.charAt(0),b); + if (string.length()==1) + break; + string=string.substring(1); + } + } + catch(IOException e) + { + System.err.println("IOE while looking for \""+orig+"\" in '"+scanned+"'"); + throw e; + } + } + + private void skipTo(String string,InputStream in) + throws IOException + { + int state=0; + + while(true) + { + int b = in.read(); + if (b<0) + throw new EOFException(); + + if (b==string.charAt(state)) + { + state++; + if (state==string.length()) + break; + } + else + state=0; + } + } + + + private static class TestWebSocket implements WebSocket.OnFrame, WebSocket.OnBinaryMessage, WebSocket.OnTextMessage + { + protected boolean _latch; + boolean _onConnect=false; + boolean _echo=true; + boolean _aggregate=false; + private final CountDownLatch connected = new CountDownLatch(1); + private final CountDownLatch disconnected = new CountDownLatch(1); + private volatile FrameConnection connection; + + public FrameConnection getConnection() + { + return connection; + } + + public void onHandshake(FrameConnection connection) + { + this.connection = connection; + } + + public void onOpen(Connection connection) + { + if (_onConnect) + { + try + { + connection.sendMessage("sent on connect"); + } + catch(IOException e) + { + e.printStackTrace(); + } + } + connected.countDown(); + } + + private boolean awaitConnected(long time) throws InterruptedException + { + return connected.await(time, TimeUnit.MILLISECONDS); + } + + private boolean awaitDisconnected(long time) throws InterruptedException + { + return disconnected.await(time, TimeUnit.MILLISECONDS); + } + + public void onClose(int code,String message) + { + disconnected.countDown(); + } + + public boolean onFrame(byte flags, byte opcode, byte[] data, int offset, int length) + { + if (_echo) + { + switch(opcode) + { + case WebSocketConnectionD13.OP_CLOSE: + case WebSocketConnectionD13.OP_PING: + case WebSocketConnectionD13.OP_PONG: + break; + + default: + try + { + connection.sendFrame(flags,opcode,data,offset,length); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + } + return false; + } + + public void onMessage(byte[] data, int offset, int length) + { + if (_aggregate) + { + try + { + connection.sendMessage(data,offset,length); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + } + + public void onMessage(String data) + { + __textCount.incrementAndGet(); + if (_latch) + { + try + { + __latch.await(); + } + catch(Exception e) + { + e.printStackTrace(); + } + } + + if (_aggregate) + { + try + { + connection.sendMessage(data); + } + catch (IOException e) + { + e.printStackTrace(); + } + } + } + + } +} diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD13Test.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD13Test.java new file mode 100644 index 00000000000..4378e1ba8d6 --- /dev/null +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD13Test.java @@ -0,0 +1,359 @@ +package org.eclipse.jetty.websocket; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; + +import org.eclipse.jetty.http.HttpHeaderValues; +import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.io.BufferCache.CachedBuffer; +import org.eclipse.jetty.io.ByteArrayBuffer; +import org.eclipse.jetty.io.ByteArrayEndPoint; +import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.Utf8StringBuilder; +import org.junit.Before; +import org.junit.Test; + +/** + * @version $Revision$ $Date$ + */ +public class WebSocketParserD13Test +{ + private MaskedByteArrayBuffer _in; + private Handler _handler; + private WebSocketParserD13 _parser; + private byte[] _mask = new byte[] {(byte)0x00,(byte)0xF0,(byte)0x0F,(byte)0xFF}; + private int _m; + + class MaskedByteArrayBuffer extends ByteArrayBuffer + { + MaskedByteArrayBuffer() + { + super(4096); + } + + public void sendMask() + { + super.poke(putIndex(),_mask,0,4); + super.setPutIndex(putIndex()+4); + _m=0; + } + + @Override + public int put(Buffer src) + { + return put(src.asArray(),0,src.length()); + } + + public void putUnmasked(byte b) + { + super.put(b); + } + + @Override + public void put(byte b) + { + super.put((byte)(b^_mask[_m++%4])); + } + + @Override + public int put(byte[] b, int offset, int length) + { + byte[] mb = new byte[b.length]; + final int end=offset+length; + for (int i=offset;i0); + assertEquals(0xf,_handler._flags); + assertEquals(0xf,_handler._opcode); + assertTrue(_parser.isBufferEmpty()); + _parser.returnBuffer(); + assertTrue(_parser.getBuffer()==null); + } + + @Test + public void testShortText() throws Exception + { + _in.putUnmasked((byte)0x81); + _in.putUnmasked((byte)(0x80|11)); + _in.sendMask(); + _in.put("Hello World".getBytes(StringUtil.__UTF8)); + // System.err.println("tosend="+TypeUtil.toHexString(_in.asArray())); + + int progress =_parser.parseNext(); + + assertEquals(18,progress); + assertEquals("Hello World",_handler._data.get(0)); + assertEquals(0x8,_handler._flags); + assertEquals(0x1,_handler._opcode); + assertTrue(_parser.isBufferEmpty()); + _parser.returnBuffer(); + assertTrue(_parser.getBuffer()==null); + } + + @Test + public void testShortUtf8() throws Exception + { + String string = "Hell\uFF4f W\uFF4Frld"; + byte[] bytes = string.getBytes("UTF-8"); + + _in.putUnmasked((byte)0x81); + _in.putUnmasked((byte)(0x80|bytes.length)); + _in.sendMask(); + _in.put(bytes); + + int progress =_parser.parseNext(); + + assertEquals(bytes.length+7,progress); + assertEquals(string,_handler._data.get(0)); + assertEquals(0x8,_handler._flags); + assertEquals(0x1,_handler._opcode); + _parser.returnBuffer(); + assertTrue(_parser.isBufferEmpty()); + assertTrue(_parser.getBuffer()==null); + } + + @Test + public void testMediumText() throws Exception + { + String string = "Hell\uFF4f Medium W\uFF4Frld "; + for (int i=0;i<4;i++) + string = string+string; + string += ". The end."; + + byte[] bytes = string.getBytes(StringUtil.__UTF8); + + _in.putUnmasked((byte)0x81); + _in.putUnmasked((byte)(0x80|0x7E)); + _in.putUnmasked((byte)(bytes.length>>8)); + _in.putUnmasked((byte)(bytes.length&0xff)); + _in.sendMask(); + _in.put(bytes); + + int progress =_parser.parseNext(); + + assertEquals(bytes.length+9,progress); + assertEquals(string,_handler._data.get(0)); + assertEquals(0x8,_handler._flags); + assertEquals(0x1,_handler._opcode); + _parser.returnBuffer(); + assertTrue(_parser.isBufferEmpty()); + assertTrue(_parser.getBuffer()==null); + } + + @Test + public void testLongText() throws Exception + { + WebSocketBuffers buffers = new WebSocketBuffers(0x20000); + ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); + WebSocketParserD13 parser=new WebSocketParserD13(buffers, endPoint,_handler,false); + ByteArrayBuffer in = new ByteArrayBuffer(0x20000); + endPoint.setIn(in); + + String string = "Hell\uFF4f Big W\uFF4Frld "; + for (int i=0;i<12;i++) + string = string+string; + string += ". The end."; + + byte[] bytes = string.getBytes("UTF-8"); + + _in.sendMask(); + in.put((byte)0x84); + in.put((byte)0x7F); + in.put((byte)0x00); + in.put((byte)0x00); + in.put((byte)0x00); + in.put((byte)0x00); + in.put((byte)0x00); + in.put((byte)(bytes.length>>16)); + in.put((byte)((bytes.length>>8)&0xff)); + in.put((byte)(bytes.length&0xff)); + in.put(bytes); + + int progress =parser.parseNext(); + parser.returnBuffer(); + + assertEquals(bytes.length+11,progress); + assertEquals(string,_handler._data.get(0)); + assertTrue(parser.isBufferEmpty()); + assertTrue(parser.getBuffer()==null); + } + + @Test + public void testShortFragmentTest() throws Exception + { + _in.putUnmasked((byte)0x01); + _in.putUnmasked((byte)0x86); + _in.sendMask(); + _in.put("Hello ".getBytes(StringUtil.__UTF8)); + _in.putUnmasked((byte)0x80); + _in.putUnmasked((byte)0x85); + _in.sendMask(); + _in.put("World".getBytes(StringUtil.__UTF8)); + + int progress =_parser.parseNext(); + + assertEquals(24,progress); + assertEquals(0,_handler._data.size()); + assertFalse(_parser.isBufferEmpty()); + assertFalse(_parser.getBuffer()==null); + + progress =_parser.parseNext(); + _parser.returnBuffer(); + + assertEquals(1,progress); + assertEquals("Hello World",_handler._data.get(0)); + assertTrue(_parser.isBufferEmpty()); + assertTrue(_parser.getBuffer()==null); + } + + @Test + public void testFrameTooLarge() throws Exception + { + // Buffers are only 1024, so this frame is too large + _parser.setFakeFragments(false); + + _in.putUnmasked((byte)0x81); + _in.putUnmasked((byte)(0x80|0x7E)); + _in.putUnmasked((byte)(2048>>8)); + _in.putUnmasked((byte)(2048&0xff)); + _in.sendMask(); + + int progress =_parser.parseNext(); + + assertTrue(progress>0); + + assertEquals(WebSocketConnectionD13.CLOSE_POLICY_VIOLATION,_handler._code); + for (int i=0;i<2048;i++) + _in.put((byte)'a'); + progress =_parser.parseNext(); + + assertEquals(2048,progress); + assertEquals(0,_handler._data.size()); + assertEquals(0,_handler._utf8.length()); + + _handler._code=0; + _handler._message=null; + + _in.putUnmasked((byte)0x81); + _in.putUnmasked((byte)0xFE); + _in.putUnmasked((byte)(1024>>8)); + _in.putUnmasked((byte)(1024&0xff)); + _in.sendMask(); + for (int i=0;i<1024;i++) + _in.put((byte)'a'); + + progress =_parser.parseNext(); + assertTrue(progress>0); + assertEquals(1,_handler._data.size()); + assertEquals(1024,_handler._data.get(0).length()); + } + + @Test + public void testFakeFragement() throws Exception + { + // Buffers are only 1024, so this frame will be fake fragmented + _parser.setFakeFragments(true); + + _in.putUnmasked((byte)0x81); + _in.putUnmasked((byte)(0x80|0x7E)); + _in.putUnmasked((byte)(2048>>8)); + _in.putUnmasked((byte)(2048&0xff)); + _in.sendMask(); + for (int i=0;i<2048;i++) + _in.put((byte)('a'+i%26)); + + int progress =_parser.parseNext(); + assertTrue(progress>0); + + assertEquals(2,_handler._frames); + assertEquals(WebSocketConnectionD13.OP_CONTINUATION,_handler._opcode); + assertEquals(1,_handler._data.size()); + String mesg=_handler._data.remove(0); + + assertEquals(2048,mesg.length()); + + for (int i=0;i<2048;i++) + assertEquals(('a'+i%26),mesg.charAt(i)); + } + + private class Handler implements WebSocketParser.FrameHandler + { + Utf8StringBuilder _utf8 = new Utf8StringBuilder(); + public List _data = new ArrayList(); + private byte _flags; + private byte _opcode; + int _code; + String _message; + int _frames; + + public void onFrame(byte flags, byte opcode, Buffer buffer) + { + _frames++; + _flags=flags; + _opcode=opcode; + if ((flags&0x8)==0) + _utf8.append(buffer.array(),buffer.getIndex(),buffer.length()); + else if (_utf8.length()==0) + _data.add(buffer.toString("utf-8")); + else + { + _utf8.append(buffer.array(),buffer.getIndex(),buffer.length()); + _data.add(_utf8.toString()); + _utf8.reset(); + } + } + + public void close(int code,String message) + { + _code=code; + _message=message; + } + } +}