From a4f122930a82d951853f172f106fe8d18b17f5d9 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Tue, 22 Feb 2011 03:32:07 +0000 Subject: [PATCH] 337685 Work in progress on draft 5 websockets git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@2810 7e9141cc-0065-0410-87d8-b60c137991c4 --- .../jetty/client/WebSocketUpgradeTest.java | 2 +- .../jetty/websocket/FrameHandlerD5.java | 94 +++++ .../websocket/WebSocketConnectionD05.java | 373 ++++++++++++++++++ .../jetty/websocket/WebSocketFactory.java | 3 + .../websocket/WebSocketGeneratorD05.java | 198 ++++++++++ .../jetty/websocket/WebSocketParserD05.java | 257 ++++++++++++ .../websocket/WebSocketParserD05Test.java | 244 ++++++++++++ 7 files changed, 1170 insertions(+), 1 deletion(-) create mode 100644 jetty-websocket/src/main/java/org/eclipse/jetty/websocket/FrameHandlerD5.java create mode 100644 jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD05.java create mode 100644 jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD05.java create mode 100644 jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD05.java create mode 100644 jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD05Test.java diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/WebSocketUpgradeTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/WebSocketUpgradeTest.java index 1b4bfae0f93..e615d3aee4a 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/WebSocketUpgradeTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/WebSocketUpgradeTest.java @@ -156,7 +156,7 @@ public class WebSocketUpgradeTest extends TestCase int status = httpExchange.waitForDone(); assertEquals(HttpExchange.STATUS_COMPLETED, status); - System.err.println("results="+_results); + // System.err.println("results="+_results); assertEquals("serverWS.onConnect", _results.poll(1,TimeUnit.SECONDS)); TestWebSocket serverWS = (TestWebSocket)_results.poll(1,TimeUnit.SECONDS); diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/FrameHandlerD5.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/FrameHandlerD5.java new file mode 100644 index 00000000000..3169b1e8c7e --- /dev/null +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/FrameHandlerD5.java @@ -0,0 +1,94 @@ +// ======================================================================== +// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== + +package org.eclipse.jetty.websocket; + +import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.util.Utf8StringBuilder; +import org.eclipse.jetty.util.log.Log; + +final class FrameHandlerD5 implements WebSocketParser.FrameHandler +{ + public final static byte PING=1; + public final static byte PONG=1; + + final WebSocketConnectionD05 _connection; + final WebSocket _websocket; + final Utf8StringBuilder _utf8 = new Utf8StringBuilder(); + boolean _fragmented=false; + + FrameHandlerD5(WebSocketConnectionD05 connection, WebSocket websocket) + { + _connection=connection; + _websocket=websocket; + } + + public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer) + { + try + { + byte[] array=buffer.array(); + + if (opcode==0) + { + if (more) + { + _utf8.append(buffer.array(),buffer.getIndex(),buffer.length()); + _fragmented=true; + } + else if (_fragmented) + { + _utf8.append(buffer.array(),buffer.getIndex(),buffer.length()); + _websocket.onMessage(opcode,_utf8.toString()); + _utf8.reset(); + _fragmented=false; + } + else + { + _websocket.onMessage(opcode,buffer.toString("utf-8")); + } + } + else if (opcode==PING) + { + _connection.sendMessage(PONG,buffer.array(),buffer.getIndex(),buffer.length()); + } + else if (opcode==PONG) + { + + } + else + { + if (more) + { + _websocket.onFragment(true,opcode,array,buffer.getIndex(),buffer.length()); + } + else if (_fragmented) + { + _websocket.onFragment(false,opcode,array,buffer.getIndex(),buffer.length()); + } + else + { + _websocket.onMessage(opcode,array,buffer.getIndex(),buffer.length()); + } + } + } + catch(ThreadDeath th) + { + throw th; + } + catch(Throwable th) + { + Log.warn(th); + } + } +} \ No newline at end of file diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD05.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD05.java new file mode 100644 index 00000000000..29da82338ee --- /dev/null +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD05.java @@ -0,0 +1,373 @@ +// ======================================================================== +// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== + +package org.eclipse.jetty.websocket; + +import java.io.IOException; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; + +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.io.AsyncEndPoint; +import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.io.ByteArrayBuffer; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.nio.IndirectNIOBuffer; +import org.eclipse.jetty.io.nio.SelectChannelEndPoint; +import org.eclipse.jetty.util.log.Log; + +public class WebSocketConnectionD05 implements WebSocketConnection +{ + final IdleCheck _idle; + final EndPoint _endp; + final WebSocketParser _parser; + final WebSocketGenerator _generator; + final long _timestamp; + final WebSocket _websocket; + String _key1; + String _key2; + ByteArrayBuffer _hixieBytes; + + public WebSocketConnectionD05(WebSocket websocket, EndPoint endpoint,int draft) + throws IOException + { + this(websocket,endpoint,new WebSocketBuffers(8192),System.currentTimeMillis(),300000,draft); + } + + public WebSocketConnectionD05(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, int draft) + throws IOException + { + // TODO - can we use the endpoint idle mechanism? + if (endpoint instanceof AsyncEndPoint) + ((AsyncEndPoint)endpoint).cancelIdle(); + + _endp = endpoint; + _endp.setMaxIdleTime(maxIdleTime); + + _timestamp = timestamp; + _websocket = websocket; + _generator = new WebSocketGeneratorD05(buffers, _endp); + _parser = new WebSocketParserD05(buffers, endpoint, new FrameHandlerD5(this,_websocket),true); + + // TODO should these be AsyncEndPoint checks/calls? + if (_endp instanceof SelectChannelEndPoint) + { + final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp; + scep.cancelIdle(); + _idle=new IdleCheck() + { + public void access(EndPoint endp) + { + scep.scheduleIdle(); + } + }; + scep.scheduleIdle(); + } + else + { + _idle = new IdleCheck() + { + public void access(EndPoint endp) + {} + }; + } + } + + public void setHixieKeys(String key1,String key2) + { + _key1=key1; + _key2=key2; + _hixieBytes=new IndirectNIOBuffer(16); + } + + public Connection handle() throws IOException + { + try + { + // handle stupid hixie random bytes + if (_hixieBytes!=null) + { + + // take any available bytes from the parser buffer, which may have already been read + Buffer buffer=_parser.getBuffer(); + if (buffer!=null && buffer.length()>0) + { + int l=buffer.length(); + if (l>(8-_hixieBytes.length())) + l=8-_hixieBytes.length(); + _hixieBytes.put(buffer.peek(buffer.getIndex(),l)); + buffer.skip(l); + } + + // while we are not blocked + while(_endp.isOpen()) + { + // do we now have enough + if (_hixieBytes.length()==8) + { + // we have the silly random bytes + // so let's work out the stupid 16 byte reply. + doTheHixieHixieShake(); + _endp.flush(_hixieBytes); + _hixieBytes=null; + _endp.flush(); + break; + } + + // no, then let's fill + int filled=_endp.fill(_hixieBytes); + if (filled<0) + { + _endp.close(); + break; + } + } + + _websocket.onConnect(this); + return this; + } + + // handle the framing protocol + boolean progress=true; + + while (progress) + { + int flushed=_generator.flush(); + int filled=_parser.parseNext(); + + progress = flushed>0 || filled>0; + + if (filled<0 || flushed<0) + { + _endp.close(); + break; + } + } + } + catch(IOException e) + { + try + { + _endp.close(); + } + catch(IOException e2) + { + Log.ignore(e2); + } + throw e; + } + finally + { + if (_endp.isOpen()) + { + _idle.access(_endp); + checkWriteable(); + } + } + return this; + } + + private void doTheHixieHixieShake() + { + byte[] result=WebSocketConnectionD05.doTheHixieHixieShake( + WebSocketConnectionD05.hixieCrypt(_key1), + WebSocketConnectionD05.hixieCrypt(_key2), + _hixieBytes.asArray()); + _hixieBytes.clear(); + _hixieBytes.put(result); + } + + public boolean isOpen() + { + return _endp!=null&&_endp.isOpen(); + } + + public boolean isIdle() + { + return _parser.isBufferEmpty() && _generator.isBufferEmpty(); + } + + public boolean isSuspended() + { + return false; + } + + public void closed() + { + _websocket.onDisconnect(); + } + + public long getTimeStamp() + { + return _timestamp; + } + + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(java.lang.String) + */ + public void sendMessage(String content) throws IOException + { + sendMessage(WebSocket.SENTINEL_FRAME,content); + } + + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(byte, java.lang.String) + */ + public void sendMessage(byte frame, String content) throws IOException + { + _generator.addFrame(frame,content,_endp.getMaxIdleTime()); + _generator.flush(); + checkWriteable(); + _idle.access(_endp); + } + + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(byte, byte[], int, int) + */ + public void sendMessage(byte opcode, byte[] content, int offset, int length) throws IOException + { + _generator.addFrame(opcode,content,offset,length,_endp.getMaxIdleTime()); + _generator.flush(); + checkWriteable(); + _idle.access(_endp); + } + + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.websocket.WebSocketConnection#sendFragment(boolean, byte, byte[], int, int) + */ + public void sendFragment(boolean more,byte opcode, byte[] content, int offset, int length) throws IOException + { + _generator.addFragment(more,opcode,content,offset,length,_endp.getMaxIdleTime()); + _generator.flush(); + checkWriteable(); + _idle.access(_endp); + } + + public void disconnect() + { + try + { + _generator.flush(_endp.getMaxIdleTime()); + _endp.close(); + } + catch(IOException e) + { + Log.ignore(e); + } + } + + public void fillBuffersFrom(Buffer buffer) + { + _parser.fill(buffer); + } + + + private void checkWriteable() + { + if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint) + ((AsyncEndPoint)_endp).scheduleWrite(); + } + + /* ------------------------------------------------------------ */ + static long hixieCrypt(String key) + { + // Don't ask me what all this is about. + // I think it's pretend secret stuff, kind of + // like talking in pig latin! + long number=0; + int spaces=0; + for (char c : key.toCharArray()) + { + if (Character.isDigit(c)) + number=number*10+(c-'0'); + else if (c==' ') + spaces++; + } + return number/spaces; + } + + public static byte[] doTheHixieHixieShake(long key1,long key2,byte[] key3) + { + try + { + MessageDigest md = MessageDigest.getInstance("MD5"); + byte [] fodder = new byte[16]; + + fodder[0]=(byte)(0xff&(key1>>24)); + fodder[1]=(byte)(0xff&(key1>>16)); + fodder[2]=(byte)(0xff&(key1>>8)); + fodder[3]=(byte)(0xff&key1); + fodder[4]=(byte)(0xff&(key2>>24)); + fodder[5]=(byte)(0xff&(key2>>16)); + fodder[6]=(byte)(0xff&(key2>>8)); + fodder[7]=(byte)(0xff&key2); + for (int i=0;i<8;i++) + fodder[8+i]=key3[i]; + md.update(fodder); + byte[] result=md.digest(); + return result; + } + catch (NoSuchAlgorithmException e) + { + throw new IllegalStateException(e); + } + } + + private interface IdleCheck + { + void access(EndPoint endp); + } + + public void handshake(HttpServletRequest request, HttpServletResponse response, String origin, String subprotocol) throws IOException + { + String uri=request.getRequestURI(); + String query=request.getQueryString(); + if (query!=null && query.length()>0) + uri+="?"+query; + String host=request.getHeader("Host"); + + String key1 = request.getHeader("Sec-WebSocket-Key1"); + if (key1!=null) + { + String key2 = request.getHeader("Sec-WebSocket-Key2"); + setHixieKeys(key1,key2); + + response.setHeader("Upgrade","WebSocket"); + response.addHeader("Connection","Upgrade"); + response.addHeader("Sec-WebSocket-Origin",origin); + response.addHeader("Sec-WebSocket-Location",(request.isSecure()?"wss://":"ws://")+host+uri); + if (subprotocol!=null) + response.addHeader("Sec-WebSocket-Protocol",subprotocol); + response.sendError(101,"WebSocket Protocol Handshake"); + } + else + { + response.setHeader("Upgrade","WebSocket"); + response.addHeader("Connection","Upgrade"); + response.addHeader("WebSocket-Origin",origin); + response.addHeader("WebSocket-Location",(request.isSecure()?"wss://":"ws://")+host+uri); + if (subprotocol!=null) + response.addHeader("WebSocket-Protocol",subprotocol); + response.sendError(101,"Web Socket Protocol Handshake"); + response.flushBuffer(); + _websocket.onConnect(this); + } + } +} diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java index 18bdc6c01ee..e24ec0359a4 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java @@ -107,6 +107,9 @@ public class WebSocketFactory final WebSocketConnection connection; switch(draft) { + case 5: + connection=new WebSocketConnectionD05(websocket,endp,_buffers,http.getTimeStamp(), _maxIdleTime,draft); + break; default: connection=new WebSocketConnectionD00(websocket,endp,_buffers,http.getTimeStamp(), _maxIdleTime,draft); } diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD05.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD05.java new file mode 100644 index 00000000000..a07afc9ecf8 --- /dev/null +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD05.java @@ -0,0 +1,198 @@ +// ======================================================================== +// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== + +package org.eclipse.jetty.websocket; + +import java.io.IOException; + +import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.io.EofException; + + +/* ------------------------------------------------------------ */ +/** WebSocketGenerator. + * This class generates websocket packets. + * It is fully synchronized because it is likely that async + * threads will call the addMessage methods while other + * threads are flushing the generator. + */ +public class WebSocketGeneratorD05 implements WebSocketGenerator +{ + final private WebSocketBuffers _buffers; + final private EndPoint _endp; + private Buffer _buffer; + + public WebSocketGeneratorD05(WebSocketBuffers buffers, EndPoint endp) + { + _buffers=buffers; + _endp=endp; + } + + public synchronized void addFrame(byte opcode,byte[] content, int blockFor) throws IOException + { + addFrame(opcode,content,0,content.length,blockFor); + } + + + public synchronized void addFrame(byte opcode,byte[] content, int offset, int length, int blockFor) throws IOException + { + addFragment(false,opcode,content,offset,length,blockFor); + } + + public synchronized void addFragment(boolean more, byte opcode, byte[] content, int offset, int length, int blockFor) throws IOException + { + if (_buffer==null) + _buffer=_buffers.getDirectBuffer(); + + if (_buffer.space() == 0) + expelBuffer(blockFor); + + opcode = (byte)(opcode & 0x0f); + + while (length>0) + { + // slice a fragment off + int fragment=length; + if (fragment+10>_buffer.capacity()) + { + fragment=_buffer.capacity()-10; + bufferPut((byte)(0x80|opcode), blockFor); + } + else if (more) + bufferPut((byte)(0x80|opcode), blockFor); + else + bufferPut(opcode, blockFor); + + if (fragment>0xffff) + { + bufferPut((byte)0x7f, blockFor); + bufferPut((byte)((fragment>>56)&0x7f), blockFor); + bufferPut((byte)((fragment>>48)&0xff), blockFor); + bufferPut((byte)((fragment>>40)&0xff), blockFor); + bufferPut((byte)((fragment>>32)&0xff), blockFor); + bufferPut((byte)((fragment>>24)&0xff), blockFor); + bufferPut((byte)((fragment>>16)&0xff), blockFor); + bufferPut((byte)((fragment>>8)&0xff), blockFor); + bufferPut((byte)(fragment&0xff), blockFor); + } + else if (fragment >=0x7e) + { + bufferPut((byte)126, blockFor); + bufferPut((byte)(fragment>>8), blockFor); + bufferPut((byte)(fragment&0xff), blockFor); + } + else + { + bufferPut((byte)fragment, blockFor); + } + + int remaining = fragment; + while (remaining > 0) + { + _buffer.compact(); + int chunk = remaining < _buffer.space() ? remaining : _buffer.space(); + _buffer.put(content, offset + (fragment - remaining), chunk); + remaining -= chunk; + if (_buffer.space() > 0) + { + // Gently flush the data, issuing a non-blocking write + flushBuffer(); + } + else + { + // Forcibly flush the data, issuing a blocking write + expelBuffer(blockFor); + if (remaining == 0) + { + // Gently flush the data, issuing a non-blocking write + flushBuffer(); + } + } + } + offset+=fragment; + length-=fragment; + } + } + + private synchronized void bufferPut(byte datum, long blockFor) throws IOException + { + if (_buffer==null) + _buffer=_buffers.getDirectBuffer(); + _buffer.put(datum); + if (_buffer.space() == 0) + expelBuffer(blockFor); + } + + public synchronized void addFrame(byte frame, String content, int blockFor) throws IOException + { + byte[] bytes = content.getBytes("UTF-8"); + addFrame(frame, bytes, 0, bytes.length, blockFor); + } + + public synchronized int flush(int blockFor) throws IOException + { + return expelBuffer(blockFor); + } + + public synchronized int flush() throws IOException + { + int flushed = flushBuffer(); + if (_buffer!=null && _buffer.length()==0) + { + _buffers.returnBuffer(_buffer); + _buffer=null; + } + return flushed; + } + + private synchronized int flushBuffer() throws IOException + { + if (!_endp.isOpen()) + throw new EofException(); + + if (_buffer!=null) + return _endp.flush(_buffer); + + return 0; + } + + private synchronized int expelBuffer(long blockFor) throws IOException + { + if (_buffer==null) + return 0; + int result = flushBuffer(); + _buffer.compact(); + if (!_endp.isBlocking()) + { + while (_buffer.space()==0) + { + // TODO: in case the I/O system signals write ready, but when we attempt to write we cannot + // TODO: we should decrease the blockFor timeout instead of waiting again the whole timeout + boolean ready = _endp.blockWritable(blockFor); + if (!ready) + throw new IOException("Write timeout"); + + result += flushBuffer(); + _buffer.compact(); + } + } + return result; + } + + public synchronized boolean isBufferEmpty() + { + return _buffer==null || _buffer.length()==0; + } + +} diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD05.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD05.java new file mode 100644 index 00000000000..36079b2e9a4 --- /dev/null +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD05.java @@ -0,0 +1,257 @@ +// ======================================================================== +// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== + +package org.eclipse.jetty.websocket; + +import java.io.IOException; + +import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.io.Buffers; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.util.Utf8StringBuilder; +import org.eclipse.jetty.util.log.Log; + + + +/* ------------------------------------------------------------ */ +/** + * Parser the WebSocket protocol. + * + */ +public class WebSocketParserD05 implements WebSocketParser +{ + public enum State { + MASK(0), OPCODE(1), LENGTH_7(2), LENGTH_16(4), LENGTH_63(10), DATA(10); + + int _minSize; + + State(int minSize) + { + _minSize=minSize; + } + + int getMinSize() + { + return _minSize; + } + }; + + + private final WebSocketBuffers _buffers; + private final EndPoint _endp; + private final FrameHandler _handler; + private final boolean _masked; + private State _state; + private Buffer _buffer; + private boolean _more; + private byte _flags; + private byte _opcode; + private int _count; + private long _length; + private Utf8StringBuilder _utf8; + private final byte[] _mask = new byte[4]; + private int _m; + + /* ------------------------------------------------------------ */ + /** + * @param buffers The buffers to use for parsing. Only the {@link Buffers#getBuffer()} is used. + * This should be a direct buffer if binary data is mostly used or an indirect buffer if utf-8 data + * is mostly used. + * @param endp + * @param handler + */ + public WebSocketParserD05(WebSocketBuffers buffers, EndPoint endp, FrameHandler handler, boolean masked) + { + _buffers=buffers; + _endp=endp; + _handler=handler; + _masked=masked; + _state=_masked?State.MASK:State.OPCODE; + } + + /* ------------------------------------------------------------ */ + public boolean isBufferEmpty() + { + return _buffer==null || _buffer.length()==0; + } + + /* ------------------------------------------------------------ */ + public Buffer getBuffer() + { + return _buffer; + } + + /* ------------------------------------------------------------ */ + /** Parse to next event. + * Parse to the next {@link WebSocketParser.FrameHandler} event or until no more data is + * available. Fill data from the {@link EndPoint} only as necessary. + * @return An indication of progress or otherwise. -1 indicates EOF, 0 indicates + * that no bytes were read and no messages parsed. A positive number indicates either + * the bytes filled or the messages parsed. + */ + public int parseNext() + { + if (_buffer==null) + _buffer=_buffers.getBuffer(); + + int total_filled=0; + + // Loop until an datagram call back or can't fill anymore + while(true) + { + int available=_buffer.length(); + + // Fill buffer if we need a byte or need length + if (available < (_state.getMinSize() + (_masked?4:0)) || _state==State.DATA && available<_count) + { + // compact to mark (set at start of data) + _buffer.compact(); + + // if no space, then the data is too big for buffer + if (_buffer.space() == 0) + throw new IllegalStateException("FULL"); + + // catch IOExceptions (probably EOF) and try to parse what we have + try + { + int filled=_endp.isOpen()?_endp.fill(_buffer):-1; + if (filled<=0) + return total_filled; + total_filled+=filled; + available=_buffer.length(); + } + catch(IOException e) + { + Log.debug(e); + return total_filled>0?total_filled:-1; + } + } + + // if we are here, then we have sufficient bytes to process the current state. + + // Parse the buffer byte by byte (unless it is STATE_DATA) + byte b; + while (_state!=State.DATA && available-->0) + { + switch (_state) + { + case MASK: + _buffer.get(_mask,0,4); + _state=State.OPCODE; + _m=0; + continue; + + case OPCODE: + b=_buffer.get(); + if (_masked) + b^=_mask[_m++%4]; + _opcode=(byte)(b&0xf); + _flags=(byte)(b>>4); + _more=(_flags&8)!=0; + _state=State.LENGTH_7; + continue; + + case LENGTH_7: + b=_buffer.get(); + if (_masked) + b^=_mask[_m++%4]; + switch(b) + { + case 127: + _length=0; + _count=8; + _state=State.LENGTH_63; + break; + case 126: + _length=0; + _count=2; + _state=State.LENGTH_16; + break; + default: + _length=(0x7f&b); + _count=(int)_length; + _state=State.DATA; + } + continue; + + case LENGTH_16: + b=_buffer.get(); + if (_masked) + b^=_mask[_m++%4]; + _length = _length<<8 | b; + if (--_count==0) + { + if (_length>=_buffer.capacity()-4) + throw new IllegalStateException("TOO LARGE"); + _count=(int)_length; + _state=State.DATA; + } + continue; + + case LENGTH_63: + b=_buffer.get(); + if (_masked) + b^=_mask[_m++%4]; + _length = _length<<8 | b; + if (--_count==0) + { + if (_length>=_buffer.capacity()-10) + throw new IllegalStateException("TOO LARGE"); + _count=(int)_length; + _state=State.DATA; + } + continue; + } + } + + if (_state==State.DATA && available>=_count) + { + Buffer data =_buffer.get(_count); + if (_masked) + { + if (data.array()==null) + data=_buffer.asMutableBuffer(); + byte[] array = data.array(); + for (int i=data.length();i-->0;) + array[data.getIndex()+i]^=_mask[_m++%4]; + } + + _handler.onFrame(_more,_flags, _opcode, data); + _count=0; + _state=State.OPCODE; + + if (_buffer.length()==0) + { + _buffers.returnBuffer(_buffer); + _buffer=null; + } + + return total_filled; + + } + } + } + + /* ------------------------------------------------------------ */ + public void fill(Buffer buffer) + { + if (buffer!=null && buffer.length()>0) + { + if (_buffer==null) + _buffer=_buffers.getBuffer(); + _buffer.put(buffer); + buffer.clear(); + } + } + +} diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD05Test.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD05Test.java new file mode 100644 index 00000000000..904ab589819 --- /dev/null +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD05Test.java @@ -0,0 +1,244 @@ +package org.eclipse.jetty.websocket; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; + +import java.util.ArrayList; +import java.util.List; + +import org.eclipse.jetty.http.HttpHeaderValues; +import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.io.BufferCache.CachedBuffer; +import org.eclipse.jetty.io.ByteArrayBuffer; +import org.eclipse.jetty.io.ByteArrayEndPoint; +import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.Utf8StringBuilder; +import org.junit.Before; +import org.junit.Test; + +/** + * @version $Revision$ $Date$ + */ +public class WebSocketParserD05Test +{ + private ByteArrayBuffer _in; + private Handler _handler; + private WebSocketParser _parser; + private byte[] _mask = new byte[] {(byte)0xff,(byte)0xff,(byte)0xff,(byte)0xff}; + + @Before + public void setUp() throws Exception + { + WebSocketBuffers buffers = new WebSocketBuffers(1024); + ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); + _handler = new Handler(); + _parser=new WebSocketParserD05(buffers, endPoint,_handler,true); + _in = new ByteArrayBuffer(2048) + { + { + // add the mask + super.put(_mask,0,4); + } + + @Override + public void poke(int index, byte b) + { + super.poke(index,(byte)(b^_mask[index%4])); + } + + @Override + public int poke(int index, Buffer src) + { + return poke(index,src.asArray(),0,src.length()); + } + + @Override + public int poke(int index, byte[] b, int offset, int length) + { + byte[] mb = new byte[b.length]; + for (int i=length;i-->0;) + mb[offset+i]=(byte)(b[offset+i]^_mask[(index+i)%4]); + return super.poke(index,mb,offset,length); + } + + @Override + public int put(Buffer src) + { + return put(src.asArray(),0,src.length()); + } + + @Override + public void put(byte b) + { + super.put((byte)(b^_mask[getIndex()%4])); + } + + @Override + public int put(byte[] b, int offset, int length) + { + byte[] mb = new byte[b.length]; + for (int i=length;i-->0;) + mb[offset+i]=(byte)(b[offset+i]^_mask[(getIndex()+i)%4]); + return super.put(mb,offset,length); + } + + @Override + public int put(byte[] b) + { + return put(b,0,b.length); + } + + }; + + + endPoint.setIn(_in); + } + + @Test + public void testCache() throws Exception + { + assertEquals(HttpHeaderValues.UPGRADE_ORDINAL ,((CachedBuffer)HttpHeaderValues.CACHE.lookup("Upgrade")).getOrdinal()); + } + + @Test + public void testShortText() throws Exception + { + _in.put((byte)0x00); + _in.put((byte)11); + _in.put("Hello World".getBytes(StringUtil.__UTF8)); + + int filled =_parser.parseNext(); + + assertEquals(17,filled); + assertEquals("Hello World",_handler._data.get(0)); + assertTrue(_parser.isBufferEmpty()); + assertTrue(_parser.getBuffer()==null); + } + + @Test + public void testShortUtf8() throws Exception + { + String string = "Hell\uFF4f W\uFF4Frld"; + byte[] bytes = string.getBytes("UTF-8"); + + _in.put((byte)0x00); + _in.put((byte)bytes.length); + _in.put(bytes); + + int filled =_parser.parseNext(); + + assertEquals(bytes.length+6,filled); + assertEquals(string,_handler._data.get(0)); + assertTrue(_parser.isBufferEmpty()); + assertTrue(_parser.getBuffer()==null); + } + + @Test + public void testMediumText() throws Exception + { + String string = "Hell\uFF4f Medium W\uFF4Frld "; + for (int i=0;i<4;i++) + string = string+string; + string += ". The end."; + + byte[] bytes = string.getBytes("UTF-8"); + + _in.put((byte)0x00); + _in.put((byte)0x7E); + _in.put((byte)(bytes.length>>8)); + _in.put((byte)(bytes.length&0xff)); + _in.put(bytes); + + int filled =_parser.parseNext(); + + assertEquals(bytes.length+8,filled); + assertEquals(string,_handler._data.get(0)); + assertTrue(_parser.isBufferEmpty()); + assertTrue(_parser.getBuffer()==null); + } + + @Test + public void testLongText() throws Exception + { + + WebSocketBuffers buffers = new WebSocketBuffers(0x20000); + ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); + WebSocketParser parser=new WebSocketParserD01(buffers, endPoint,_handler); + ByteArrayBuffer in = new ByteArrayBuffer(0x20000); + endPoint.setIn(in); + + + String string = "Hell\uFF4f Big W\uFF4Frld "; + for (int i=0;i<12;i++) + string = string+string; + string += ". The end."; + + byte[] bytes = string.getBytes("UTF-8"); + + in.put((byte)0x00); + in.put((byte)0x7F); + in.put((byte)0x00); + in.put((byte)0x00); + in.put((byte)0x00); + in.put((byte)0x00); + in.put((byte)0x00); + in.put((byte)(bytes.length>>16)); + in.put((byte)((bytes.length>>8)&0xff)); + in.put((byte)(bytes.length&0xff)); + in.put(bytes); + + int filled =parser.parseNext(); + + assertEquals(bytes.length+10,filled); + assertEquals(string,_handler._data.get(0)); + assertTrue(parser.isBufferEmpty()); + assertTrue(parser.getBuffer()==null); + } + + @Test + public void testShortFragmentTest() throws Exception + { + _in.put((byte)0x80); + _in.put((byte)0x06); + _in.put("Hello ".getBytes(StringUtil.__UTF8)); + _in.put((byte)0x00); + _in.put((byte)0x05); + _in.put("World".getBytes(StringUtil.__UTF8)); + + int filled =_parser.parseNext(); + + assertEquals(19,filled); + assertEquals(0,_handler._data.size()); + assertFalse(_parser.isBufferEmpty()); + assertFalse(_parser.getBuffer()==null); + + filled =_parser.parseNext(); + + assertEquals(0,filled); + assertEquals("Hello World",_handler._data.get(0)); + assertTrue(_parser.isBufferEmpty()); + assertTrue(_parser.getBuffer()==null); + } + + + private class Handler implements WebSocketParser.FrameHandler + { + Utf8StringBuilder _utf8 = new Utf8StringBuilder(); + public List _data = new ArrayList(); + + public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer) + { + if (more) + _utf8.append(buffer.array(),buffer.getIndex(),buffer.length()); + else if (_utf8.length()==0) + _data.add(opcode,buffer.toString("utf-8")); + else + { + _utf8.append(buffer.array(),buffer.getIndex(),buffer.length()); + _data.add(opcode,_utf8.toString()); + _utf8.reset(); + } + } + } +}