337685 Work in progress on draft 5 websockets

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@2810 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Greg Wilkins 2011-02-22 03:32:07 +00:00
parent 02e75add4f
commit a4f122930a
7 changed files with 1170 additions and 1 deletions

View File

@ -156,7 +156,7 @@ public class WebSocketUpgradeTest extends TestCase
int status = httpExchange.waitForDone(); int status = httpExchange.waitForDone();
assertEquals(HttpExchange.STATUS_COMPLETED, status); assertEquals(HttpExchange.STATUS_COMPLETED, status);
System.err.println("results="+_results); // System.err.println("results="+_results);
assertEquals("serverWS.onConnect", _results.poll(1,TimeUnit.SECONDS)); assertEquals("serverWS.onConnect", _results.poll(1,TimeUnit.SECONDS));
TestWebSocket serverWS = (TestWebSocket)_results.poll(1,TimeUnit.SECONDS); TestWebSocket serverWS = (TestWebSocket)_results.poll(1,TimeUnit.SECONDS);

View File

@ -0,0 +1,94 @@
// ========================================================================
// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.websocket;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
final class FrameHandlerD5 implements WebSocketParser.FrameHandler
{
public final static byte PING=1;
public final static byte PONG=1;
final WebSocketConnectionD05 _connection;
final WebSocket _websocket;
final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
boolean _fragmented=false;
FrameHandlerD5(WebSocketConnectionD05 connection, WebSocket websocket)
{
_connection=connection;
_websocket=websocket;
}
public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer)
{
try
{
byte[] array=buffer.array();
if (opcode==0)
{
if (more)
{
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
_fragmented=true;
}
else if (_fragmented)
{
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
_websocket.onMessage(opcode,_utf8.toString());
_utf8.reset();
_fragmented=false;
}
else
{
_websocket.onMessage(opcode,buffer.toString("utf-8"));
}
}
else if (opcode==PING)
{
_connection.sendMessage(PONG,buffer.array(),buffer.getIndex(),buffer.length());
}
else if (opcode==PONG)
{
}
else
{
if (more)
{
_websocket.onFragment(true,opcode,array,buffer.getIndex(),buffer.length());
}
else if (_fragmented)
{
_websocket.onFragment(false,opcode,array,buffer.getIndex(),buffer.length());
}
else
{
_websocket.onMessage(opcode,array,buffer.getIndex(),buffer.length());
}
}
}
catch(ThreadDeath th)
{
throw th;
}
catch(Throwable th)
{
Log.warn(th);
}
}
}

View File

@ -0,0 +1,373 @@
// ========================================================================
// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.websocket;
import java.io.IOException;
import java.security.MessageDigest;
import java.security.NoSuchAlgorithmException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.IndirectNIOBuffer;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.util.log.Log;
public class WebSocketConnectionD05 implements WebSocketConnection
{
final IdleCheck _idle;
final EndPoint _endp;
final WebSocketParser _parser;
final WebSocketGenerator _generator;
final long _timestamp;
final WebSocket _websocket;
String _key1;
String _key2;
ByteArrayBuffer _hixieBytes;
public WebSocketConnectionD05(WebSocket websocket, EndPoint endpoint,int draft)
throws IOException
{
this(websocket,endpoint,new WebSocketBuffers(8192),System.currentTimeMillis(),300000,draft);
}
public WebSocketConnectionD05(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, int draft)
throws IOException
{
// TODO - can we use the endpoint idle mechanism?
if (endpoint instanceof AsyncEndPoint)
((AsyncEndPoint)endpoint).cancelIdle();
_endp = endpoint;
_endp.setMaxIdleTime(maxIdleTime);
_timestamp = timestamp;
_websocket = websocket;
_generator = new WebSocketGeneratorD05(buffers, _endp);
_parser = new WebSocketParserD05(buffers, endpoint, new FrameHandlerD5(this,_websocket),true);
// TODO should these be AsyncEndPoint checks/calls?
if (_endp instanceof SelectChannelEndPoint)
{
final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp;
scep.cancelIdle();
_idle=new IdleCheck()
{
public void access(EndPoint endp)
{
scep.scheduleIdle();
}
};
scep.scheduleIdle();
}
else
{
_idle = new IdleCheck()
{
public void access(EndPoint endp)
{}
};
}
}
public void setHixieKeys(String key1,String key2)
{
_key1=key1;
_key2=key2;
_hixieBytes=new IndirectNIOBuffer(16);
}
public Connection handle() throws IOException
{
try
{
// handle stupid hixie random bytes
if (_hixieBytes!=null)
{
// take any available bytes from the parser buffer, which may have already been read
Buffer buffer=_parser.getBuffer();
if (buffer!=null && buffer.length()>0)
{
int l=buffer.length();
if (l>(8-_hixieBytes.length()))
l=8-_hixieBytes.length();
_hixieBytes.put(buffer.peek(buffer.getIndex(),l));
buffer.skip(l);
}
// while we are not blocked
while(_endp.isOpen())
{
// do we now have enough
if (_hixieBytes.length()==8)
{
// we have the silly random bytes
// so let's work out the stupid 16 byte reply.
doTheHixieHixieShake();
_endp.flush(_hixieBytes);
_hixieBytes=null;
_endp.flush();
break;
}
// no, then let's fill
int filled=_endp.fill(_hixieBytes);
if (filled<0)
{
_endp.close();
break;
}
}
_websocket.onConnect(this);
return this;
}
// handle the framing protocol
boolean progress=true;
while (progress)
{
int flushed=_generator.flush();
int filled=_parser.parseNext();
progress = flushed>0 || filled>0;
if (filled<0 || flushed<0)
{
_endp.close();
break;
}
}
}
catch(IOException e)
{
try
{
_endp.close();
}
catch(IOException e2)
{
Log.ignore(e2);
}
throw e;
}
finally
{
if (_endp.isOpen())
{
_idle.access(_endp);
checkWriteable();
}
}
return this;
}
private void doTheHixieHixieShake()
{
byte[] result=WebSocketConnectionD05.doTheHixieHixieShake(
WebSocketConnectionD05.hixieCrypt(_key1),
WebSocketConnectionD05.hixieCrypt(_key2),
_hixieBytes.asArray());
_hixieBytes.clear();
_hixieBytes.put(result);
}
public boolean isOpen()
{
return _endp!=null&&_endp.isOpen();
}
public boolean isIdle()
{
return _parser.isBufferEmpty() && _generator.isBufferEmpty();
}
public boolean isSuspended()
{
return false;
}
public void closed()
{
_websocket.onDisconnect();
}
public long getTimeStamp()
{
return _timestamp;
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(java.lang.String)
*/
public void sendMessage(String content) throws IOException
{
sendMessage(WebSocket.SENTINEL_FRAME,content);
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(byte, java.lang.String)
*/
public void sendMessage(byte frame, String content) throws IOException
{
_generator.addFrame(frame,content,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(byte, byte[], int, int)
*/
public void sendMessage(byte opcode, byte[] content, int offset, int length) throws IOException
{
_generator.addFrame(opcode,content,offset,length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.websocket.WebSocketConnection#sendFragment(boolean, byte, byte[], int, int)
*/
public void sendFragment(boolean more,byte opcode, byte[] content, int offset, int length) throws IOException
{
_generator.addFragment(more,opcode,content,offset,length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
public void disconnect()
{
try
{
_generator.flush(_endp.getMaxIdleTime());
_endp.close();
}
catch(IOException e)
{
Log.ignore(e);
}
}
public void fillBuffersFrom(Buffer buffer)
{
_parser.fill(buffer);
}
private void checkWriteable()
{
if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint)
((AsyncEndPoint)_endp).scheduleWrite();
}
/* ------------------------------------------------------------ */
static long hixieCrypt(String key)
{
// Don't ask me what all this is about.
// I think it's pretend secret stuff, kind of
// like talking in pig latin!
long number=0;
int spaces=0;
for (char c : key.toCharArray())
{
if (Character.isDigit(c))
number=number*10+(c-'0');
else if (c==' ')
spaces++;
}
return number/spaces;
}
public static byte[] doTheHixieHixieShake(long key1,long key2,byte[] key3)
{
try
{
MessageDigest md = MessageDigest.getInstance("MD5");
byte [] fodder = new byte[16];
fodder[0]=(byte)(0xff&(key1>>24));
fodder[1]=(byte)(0xff&(key1>>16));
fodder[2]=(byte)(0xff&(key1>>8));
fodder[3]=(byte)(0xff&key1);
fodder[4]=(byte)(0xff&(key2>>24));
fodder[5]=(byte)(0xff&(key2>>16));
fodder[6]=(byte)(0xff&(key2>>8));
fodder[7]=(byte)(0xff&key2);
for (int i=0;i<8;i++)
fodder[8+i]=key3[i];
md.update(fodder);
byte[] result=md.digest();
return result;
}
catch (NoSuchAlgorithmException e)
{
throw new IllegalStateException(e);
}
}
private interface IdleCheck
{
void access(EndPoint endp);
}
public void handshake(HttpServletRequest request, HttpServletResponse response, String origin, String subprotocol) throws IOException
{
String uri=request.getRequestURI();
String query=request.getQueryString();
if (query!=null && query.length()>0)
uri+="?"+query;
String host=request.getHeader("Host");
String key1 = request.getHeader("Sec-WebSocket-Key1");
if (key1!=null)
{
String key2 = request.getHeader("Sec-WebSocket-Key2");
setHixieKeys(key1,key2);
response.setHeader("Upgrade","WebSocket");
response.addHeader("Connection","Upgrade");
response.addHeader("Sec-WebSocket-Origin",origin);
response.addHeader("Sec-WebSocket-Location",(request.isSecure()?"wss://":"ws://")+host+uri);
if (subprotocol!=null)
response.addHeader("Sec-WebSocket-Protocol",subprotocol);
response.sendError(101,"WebSocket Protocol Handshake");
}
else
{
response.setHeader("Upgrade","WebSocket");
response.addHeader("Connection","Upgrade");
response.addHeader("WebSocket-Origin",origin);
response.addHeader("WebSocket-Location",(request.isSecure()?"wss://":"ws://")+host+uri);
if (subprotocol!=null)
response.addHeader("WebSocket-Protocol",subprotocol);
response.sendError(101,"Web Socket Protocol Handshake");
response.flushBuffer();
_websocket.onConnect(this);
}
}
}

View File

@ -107,6 +107,9 @@ public class WebSocketFactory
final WebSocketConnection connection; final WebSocketConnection connection;
switch(draft) switch(draft)
{ {
case 5:
connection=new WebSocketConnectionD05(websocket,endp,_buffers,http.getTimeStamp(), _maxIdleTime,draft);
break;
default: default:
connection=new WebSocketConnectionD00(websocket,endp,_buffers,http.getTimeStamp(), _maxIdleTime,draft); connection=new WebSocketConnectionD00(websocket,endp,_buffers,http.getTimeStamp(), _maxIdleTime,draft);
} }

View File

@ -0,0 +1,198 @@
// ========================================================================
// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.websocket;
import java.io.IOException;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
/* ------------------------------------------------------------ */
/** WebSocketGenerator.
* This class generates websocket packets.
* It is fully synchronized because it is likely that async
* threads will call the addMessage methods while other
* threads are flushing the generator.
*/
public class WebSocketGeneratorD05 implements WebSocketGenerator
{
final private WebSocketBuffers _buffers;
final private EndPoint _endp;
private Buffer _buffer;
public WebSocketGeneratorD05(WebSocketBuffers buffers, EndPoint endp)
{
_buffers=buffers;
_endp=endp;
}
public synchronized void addFrame(byte opcode,byte[] content, int blockFor) throws IOException
{
addFrame(opcode,content,0,content.length,blockFor);
}
public synchronized void addFrame(byte opcode,byte[] content, int offset, int length, int blockFor) throws IOException
{
addFragment(false,opcode,content,offset,length,blockFor);
}
public synchronized void addFragment(boolean more, byte opcode, byte[] content, int offset, int length, int blockFor) throws IOException
{
if (_buffer==null)
_buffer=_buffers.getDirectBuffer();
if (_buffer.space() == 0)
expelBuffer(blockFor);
opcode = (byte)(opcode & 0x0f);
while (length>0)
{
// slice a fragment off
int fragment=length;
if (fragment+10>_buffer.capacity())
{
fragment=_buffer.capacity()-10;
bufferPut((byte)(0x80|opcode), blockFor);
}
else if (more)
bufferPut((byte)(0x80|opcode), blockFor);
else
bufferPut(opcode, blockFor);
if (fragment>0xffff)
{
bufferPut((byte)0x7f, blockFor);
bufferPut((byte)((fragment>>56)&0x7f), blockFor);
bufferPut((byte)((fragment>>48)&0xff), blockFor);
bufferPut((byte)((fragment>>40)&0xff), blockFor);
bufferPut((byte)((fragment>>32)&0xff), blockFor);
bufferPut((byte)((fragment>>24)&0xff), blockFor);
bufferPut((byte)((fragment>>16)&0xff), blockFor);
bufferPut((byte)((fragment>>8)&0xff), blockFor);
bufferPut((byte)(fragment&0xff), blockFor);
}
else if (fragment >=0x7e)
{
bufferPut((byte)126, blockFor);
bufferPut((byte)(fragment>>8), blockFor);
bufferPut((byte)(fragment&0xff), blockFor);
}
else
{
bufferPut((byte)fragment, blockFor);
}
int remaining = fragment;
while (remaining > 0)
{
_buffer.compact();
int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
_buffer.put(content, offset + (fragment - remaining), chunk);
remaining -= chunk;
if (_buffer.space() > 0)
{
// Gently flush the data, issuing a non-blocking write
flushBuffer();
}
else
{
// Forcibly flush the data, issuing a blocking write
expelBuffer(blockFor);
if (remaining == 0)
{
// Gently flush the data, issuing a non-blocking write
flushBuffer();
}
}
}
offset+=fragment;
length-=fragment;
}
}
private synchronized void bufferPut(byte datum, long blockFor) throws IOException
{
if (_buffer==null)
_buffer=_buffers.getDirectBuffer();
_buffer.put(datum);
if (_buffer.space() == 0)
expelBuffer(blockFor);
}
public synchronized void addFrame(byte frame, String content, int blockFor) throws IOException
{
byte[] bytes = content.getBytes("UTF-8");
addFrame(frame, bytes, 0, bytes.length, blockFor);
}
public synchronized int flush(int blockFor) throws IOException
{
return expelBuffer(blockFor);
}
public synchronized int flush() throws IOException
{
int flushed = flushBuffer();
if (_buffer!=null && _buffer.length()==0)
{
_buffers.returnBuffer(_buffer);
_buffer=null;
}
return flushed;
}
private synchronized int flushBuffer() throws IOException
{
if (!_endp.isOpen())
throw new EofException();
if (_buffer!=null)
return _endp.flush(_buffer);
return 0;
}
private synchronized int expelBuffer(long blockFor) throws IOException
{
if (_buffer==null)
return 0;
int result = flushBuffer();
_buffer.compact();
if (!_endp.isBlocking())
{
while (_buffer.space()==0)
{
// TODO: in case the I/O system signals write ready, but when we attempt to write we cannot
// TODO: we should decrease the blockFor timeout instead of waiting again the whole timeout
boolean ready = _endp.blockWritable(blockFor);
if (!ready)
throw new IOException("Write timeout");
result += flushBuffer();
_buffer.compact();
}
}
return result;
}
public synchronized boolean isBufferEmpty()
{
return _buffer==null || _buffer.length()==0;
}
}

View File

@ -0,0 +1,257 @@
// ========================================================================
// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.websocket;
import java.io.IOException;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
/* ------------------------------------------------------------ */
/**
* Parser the WebSocket protocol.
*
*/
public class WebSocketParserD05 implements WebSocketParser
{
public enum State {
MASK(0), OPCODE(1), LENGTH_7(2), LENGTH_16(4), LENGTH_63(10), DATA(10);
int _minSize;
State(int minSize)
{
_minSize=minSize;
}
int getMinSize()
{
return _minSize;
}
};
private final WebSocketBuffers _buffers;
private final EndPoint _endp;
private final FrameHandler _handler;
private final boolean _masked;
private State _state;
private Buffer _buffer;
private boolean _more;
private byte _flags;
private byte _opcode;
private int _count;
private long _length;
private Utf8StringBuilder _utf8;
private final byte[] _mask = new byte[4];
private int _m;
/* ------------------------------------------------------------ */
/**
* @param buffers The buffers to use for parsing. Only the {@link Buffers#getBuffer()} is used.
* This should be a direct buffer if binary data is mostly used or an indirect buffer if utf-8 data
* is mostly used.
* @param endp
* @param handler
*/
public WebSocketParserD05(WebSocketBuffers buffers, EndPoint endp, FrameHandler handler, boolean masked)
{
_buffers=buffers;
_endp=endp;
_handler=handler;
_masked=masked;
_state=_masked?State.MASK:State.OPCODE;
}
/* ------------------------------------------------------------ */
public boolean isBufferEmpty()
{
return _buffer==null || _buffer.length()==0;
}
/* ------------------------------------------------------------ */
public Buffer getBuffer()
{
return _buffer;
}
/* ------------------------------------------------------------ */
/** Parse to next event.
* Parse to the next {@link WebSocketParser.FrameHandler} event or until no more data is
* available. Fill data from the {@link EndPoint} only as necessary.
* @return An indication of progress or otherwise. -1 indicates EOF, 0 indicates
* that no bytes were read and no messages parsed. A positive number indicates either
* the bytes filled or the messages parsed.
*/
public int parseNext()
{
if (_buffer==null)
_buffer=_buffers.getBuffer();
int total_filled=0;
// Loop until an datagram call back or can't fill anymore
while(true)
{
int available=_buffer.length();
// Fill buffer if we need a byte or need length
if (available < (_state.getMinSize() + (_masked?4:0)) || _state==State.DATA && available<_count)
{
// compact to mark (set at start of data)
_buffer.compact();
// if no space, then the data is too big for buffer
if (_buffer.space() == 0)
throw new IllegalStateException("FULL");
// catch IOExceptions (probably EOF) and try to parse what we have
try
{
int filled=_endp.isOpen()?_endp.fill(_buffer):-1;
if (filled<=0)
return total_filled;
total_filled+=filled;
available=_buffer.length();
}
catch(IOException e)
{
Log.debug(e);
return total_filled>0?total_filled:-1;
}
}
// if we are here, then we have sufficient bytes to process the current state.
// Parse the buffer byte by byte (unless it is STATE_DATA)
byte b;
while (_state!=State.DATA && available-->0)
{
switch (_state)
{
case MASK:
_buffer.get(_mask,0,4);
_state=State.OPCODE;
_m=0;
continue;
case OPCODE:
b=_buffer.get();
if (_masked)
b^=_mask[_m++%4];
_opcode=(byte)(b&0xf);
_flags=(byte)(b>>4);
_more=(_flags&8)!=0;
_state=State.LENGTH_7;
continue;
case LENGTH_7:
b=_buffer.get();
if (_masked)
b^=_mask[_m++%4];
switch(b)
{
case 127:
_length=0;
_count=8;
_state=State.LENGTH_63;
break;
case 126:
_length=0;
_count=2;
_state=State.LENGTH_16;
break;
default:
_length=(0x7f&b);
_count=(int)_length;
_state=State.DATA;
}
continue;
case LENGTH_16:
b=_buffer.get();
if (_masked)
b^=_mask[_m++%4];
_length = _length<<8 | b;
if (--_count==0)
{
if (_length>=_buffer.capacity()-4)
throw new IllegalStateException("TOO LARGE");
_count=(int)_length;
_state=State.DATA;
}
continue;
case LENGTH_63:
b=_buffer.get();
if (_masked)
b^=_mask[_m++%4];
_length = _length<<8 | b;
if (--_count==0)
{
if (_length>=_buffer.capacity()-10)
throw new IllegalStateException("TOO LARGE");
_count=(int)_length;
_state=State.DATA;
}
continue;
}
}
if (_state==State.DATA && available>=_count)
{
Buffer data =_buffer.get(_count);
if (_masked)
{
if (data.array()==null)
data=_buffer.asMutableBuffer();
byte[] array = data.array();
for (int i=data.length();i-->0;)
array[data.getIndex()+i]^=_mask[_m++%4];
}
_handler.onFrame(_more,_flags, _opcode, data);
_count=0;
_state=State.OPCODE;
if (_buffer.length()==0)
{
_buffers.returnBuffer(_buffer);
_buffer=null;
}
return total_filled;
}
}
}
/* ------------------------------------------------------------ */
public void fill(Buffer buffer)
{
if (buffer!=null && buffer.length()>0)
{
if (_buffer==null)
_buffer=_buffers.getBuffer();
_buffer.put(buffer);
buffer.clear();
}
}
}

View File

@ -0,0 +1,244 @@
package org.eclipse.jetty.websocket;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.http.HttpHeaderValues;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.BufferCache.CachedBuffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.junit.Before;
import org.junit.Test;
/**
* @version $Revision$ $Date$
*/
public class WebSocketParserD05Test
{
private ByteArrayBuffer _in;
private Handler _handler;
private WebSocketParser _parser;
private byte[] _mask = new byte[] {(byte)0xff,(byte)0xff,(byte)0xff,(byte)0xff};
@Before
public void setUp() throws Exception
{
WebSocketBuffers buffers = new WebSocketBuffers(1024);
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
_handler = new Handler();
_parser=new WebSocketParserD05(buffers, endPoint,_handler,true);
_in = new ByteArrayBuffer(2048)
{
{
// add the mask
super.put(_mask,0,4);
}
@Override
public void poke(int index, byte b)
{
super.poke(index,(byte)(b^_mask[index%4]));
}
@Override
public int poke(int index, Buffer src)
{
return poke(index,src.asArray(),0,src.length());
}
@Override
public int poke(int index, byte[] b, int offset, int length)
{
byte[] mb = new byte[b.length];
for (int i=length;i-->0;)
mb[offset+i]=(byte)(b[offset+i]^_mask[(index+i)%4]);
return super.poke(index,mb,offset,length);
}
@Override
public int put(Buffer src)
{
return put(src.asArray(),0,src.length());
}
@Override
public void put(byte b)
{
super.put((byte)(b^_mask[getIndex()%4]));
}
@Override
public int put(byte[] b, int offset, int length)
{
byte[] mb = new byte[b.length];
for (int i=length;i-->0;)
mb[offset+i]=(byte)(b[offset+i]^_mask[(getIndex()+i)%4]);
return super.put(mb,offset,length);
}
@Override
public int put(byte[] b)
{
return put(b,0,b.length);
}
};
endPoint.setIn(_in);
}
@Test
public void testCache() throws Exception
{
assertEquals(HttpHeaderValues.UPGRADE_ORDINAL ,((CachedBuffer)HttpHeaderValues.CACHE.lookup("Upgrade")).getOrdinal());
}
@Test
public void testShortText() throws Exception
{
_in.put((byte)0x00);
_in.put((byte)11);
_in.put("Hello World".getBytes(StringUtil.__UTF8));
int filled =_parser.parseNext();
assertEquals(17,filled);
assertEquals("Hello World",_handler._data.get(0));
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
@Test
public void testShortUtf8() throws Exception
{
String string = "Hell\uFF4f W\uFF4Frld";
byte[] bytes = string.getBytes("UTF-8");
_in.put((byte)0x00);
_in.put((byte)bytes.length);
_in.put(bytes);
int filled =_parser.parseNext();
assertEquals(bytes.length+6,filled);
assertEquals(string,_handler._data.get(0));
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
@Test
public void testMediumText() throws Exception
{
String string = "Hell\uFF4f Medium W\uFF4Frld ";
for (int i=0;i<4;i++)
string = string+string;
string += ". The end.";
byte[] bytes = string.getBytes("UTF-8");
_in.put((byte)0x00);
_in.put((byte)0x7E);
_in.put((byte)(bytes.length>>8));
_in.put((byte)(bytes.length&0xff));
_in.put(bytes);
int filled =_parser.parseNext();
assertEquals(bytes.length+8,filled);
assertEquals(string,_handler._data.get(0));
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
@Test
public void testLongText() throws Exception
{
WebSocketBuffers buffers = new WebSocketBuffers(0x20000);
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
WebSocketParser parser=new WebSocketParserD01(buffers, endPoint,_handler);
ByteArrayBuffer in = new ByteArrayBuffer(0x20000);
endPoint.setIn(in);
String string = "Hell\uFF4f Big W\uFF4Frld ";
for (int i=0;i<12;i++)
string = string+string;
string += ". The end.";
byte[] bytes = string.getBytes("UTF-8");
in.put((byte)0x00);
in.put((byte)0x7F);
in.put((byte)0x00);
in.put((byte)0x00);
in.put((byte)0x00);
in.put((byte)0x00);
in.put((byte)0x00);
in.put((byte)(bytes.length>>16));
in.put((byte)((bytes.length>>8)&0xff));
in.put((byte)(bytes.length&0xff));
in.put(bytes);
int filled =parser.parseNext();
assertEquals(bytes.length+10,filled);
assertEquals(string,_handler._data.get(0));
assertTrue(parser.isBufferEmpty());
assertTrue(parser.getBuffer()==null);
}
@Test
public void testShortFragmentTest() throws Exception
{
_in.put((byte)0x80);
_in.put((byte)0x06);
_in.put("Hello ".getBytes(StringUtil.__UTF8));
_in.put((byte)0x00);
_in.put((byte)0x05);
_in.put("World".getBytes(StringUtil.__UTF8));
int filled =_parser.parseNext();
assertEquals(19,filled);
assertEquals(0,_handler._data.size());
assertFalse(_parser.isBufferEmpty());
assertFalse(_parser.getBuffer()==null);
filled =_parser.parseNext();
assertEquals(0,filled);
assertEquals("Hello World",_handler._data.get(0));
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
private class Handler implements WebSocketParser.FrameHandler
{
Utf8StringBuilder _utf8 = new Utf8StringBuilder();
public List<String> _data = new ArrayList<String>();
public void onFrame(boolean more, byte flags, byte opcode, Buffer buffer)
{
if (more)
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
else if (_utf8.length()==0)
_data.add(opcode,buffer.toString("utf-8"));
else
{
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
_data.add(opcode,_utf8.toString());
_utf8.reset();
}
}
}
}