Websocket partially refactored and passing tests

This commit is contained in:
Greg Wilkins 2011-10-18 21:59:49 +11:00
parent 6edb7c1930
commit 49f8b0de1a
12 changed files with 248 additions and 161 deletions

View File

@ -40,57 +40,48 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
{
try
{
int no_progress = 0;
boolean progress=true;
boolean failed = false;
while (_endp.isBufferingInput() || _endp.isOpen())
// While the endpoint is open
// AND we have more characters to read OR we made some progress
while (_endp.isOpen() &&
(_parser.isMoreInBuffer() || _endp.isBufferingInput() || progress))
{
// If no exchange, skipCRLF or close on unexpected characters
HttpExchange exchange;
synchronized (this)
{
while (_exchange == null)
exchange=_exchange;
}
if (exchange == null)
{
long filled = _parser.fill();
if (filled < 0)
close();
else
{
if (_endp.isBlocking())
// Hopefully just space?
_parser.skipCRLF();
if (_parser.isMoreInBuffer())
{
try
{
this.wait();
}
catch (InterruptedException e)
{
throw new InterruptedIOException();
}
}
else
{
long filled = _parser.fill();
if (filled < 0)
{
close();
}
else
{
// Hopefully just space?
_parser.skipCRLF();
if (_parser.isMoreInBuffer())
{
LOG.warn("Unexpected data received but no request sent");
close();
}
}
return this;
LOG.warn("Unexpected data received but no request sent");
close();
}
}
return this;
}
try
{
if (_exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
if (exchange.getStatus() == HttpExchange.STATUS_WAITING_FOR_COMMIT)
{
no_progress = 0;
progress=true;
commitRequest();
}
long io = 0;
_endp.flush();
if (_generator.isComplete())
@ -98,47 +89,38 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
if (!_requestComplete)
{
_requestComplete = true;
_exchange.getEventListener().onRequestComplete();
exchange.getEventListener().onRequestComplete();
}
}
else
{
// Write as much of the request as possible
synchronized (this)
{
if (_exchange == null)
continue;
}
long flushed = _generator.flushBuffer();
io += flushed;
if (!_generator.isComplete())
progress|=(flushed>0);
if (_generator.isComplete())
{
if (_exchange!=null)
InputStream in = exchange.getRequestContentSource();
if (in != null)
{
InputStream in = _exchange.getRequestContentSource();
if (in != null)
if (_requestContentChunk == null || _requestContentChunk.length() == 0)
{
if (_requestContentChunk == null || _requestContentChunk.length() == 0)
{
_requestContentChunk = _exchange.getRequestContentChunk();
_requestContentChunk = _exchange.getRequestContentChunk();
if (_requestContentChunk != null)
_generator.addContent(_requestContentChunk,false);
else
_generator.complete();
if (_requestContentChunk != null)
_generator.addContent(_requestContentChunk,false);
else
_generator.complete();
flushed = _generator.flushBuffer();
io += flushed;
}
flushed = _generator.flushBuffer();
progress|=(flushed>0);
}
else
_generator.complete();
}
else
_generator.complete();
}
else
_generator.complete();
}
if (_generator.isComplete() && !_requestComplete)
@ -151,25 +133,12 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
if (!_parser.isComplete() && (_generator.isComplete() || _generator.isCommitted() && !_endp.isBlocking()))
{
if (_parser.parseAvailable())
io++;
progress=true;
if (_parser.isIdle() && (_endp.isInputShutdown() || !_endp.isOpen()))
throw new EOFException();
}
if (io > 0)
no_progress = 0;
else if (no_progress++ >= 1 && !_endp.isBlocking())
{
// SSL may need an extra flush as it may have made "no progress" while actually doing a handshake.
if (_endp instanceof SslSelectChannelEndPoint && !_generator.isComplete() && !_generator.isEmpty())
{
long flushed = _generator.flushBuffer();
if (flushed>0)
continue;
}
return this;
}
}
catch (Throwable e)
{
@ -270,10 +239,10 @@ public class AsyncHttpConnection extends AbstractHttpConnection implements Async
reset(true);
no_progress = 0;
progress=true;
if (_exchange != null)
{
HttpExchange exchange=_exchange;
exchange=_exchange;
_exchange = null;
// Reset the maxIdleTime because it may have been changed

View File

@ -579,8 +579,9 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
if (next!=_connection)
{
LOG.debug("{} replaced {}",next,_connection);
Connection old=_connection;
_connection=next;
_manager.endPointUpgraded(this,_connection);
_manager.endPointUpgraded(this,old);
continue;
}
break;

View File

@ -82,7 +82,6 @@ public class AsyncHttpConnection extends HttpConnection implements AsyncConnecti
{
// Reset the parser/generator
progress=true;
reset();
// look for a switched connection instance?
if (_response.getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101)
@ -91,11 +90,13 @@ public class AsyncHttpConnection extends HttpConnection implements AsyncConnecti
if (switched!=null)
connection=switched;
}
reset();
// TODO Is this required?
if (!_generator.isPersistent() && !_endp.isOutputShutdown())
{
System.err.println("Safety net oshut!!!");
LOG.warn("Safety net oshut!!!");
_endp.shutdownOutput();
}
}

View File

@ -540,9 +540,9 @@ public class StdErrLog implements Logger
{
StdErrLog sel = new StdErrLog(fullname);
// Preserve configuration for new loggers configuration
//sel.setPrintLongNames(_printLongNames);
//sel.setLevel(_level);
//sel.setSource(_source);
sel.setPrintLongNames(_printLongNames);
// Let Level come from configured Properties instead - sel.setLevel(_level);
sel.setSource(_source);
logger = __loggers.putIfAbsent(fullname,sel);
if (logger == null)
{

View File

@ -227,7 +227,7 @@ public class WebSocketClientFactory extends AggregateLifeCycle
@Override
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
{
throw new IllegalStateException();
LOG.debug("upgrade {} -> {}",oldConnection,endpoint.getConnection());
}
@Override
@ -364,7 +364,6 @@ public class WebSocketClientFactory extends AggregateLifeCycle
{
future.handshakeFailed(e);
}
}
public Connection handle() throws IOException
@ -388,7 +387,15 @@ public class WebSocketClientFactory extends AggregateLifeCycle
{
Buffer header=_parser.getHeaderBuffer();
MaskGen maskGen=_future.getMaskGen();
WebSocketConnectionD13 connection = new WebSocketConnectionD13(_future.getWebSocket(),_endp,_buffers,System.currentTimeMillis(),_future.getMaxIdleTime(),_future.getProtocol(),null,10,maskGen);
WebSocketConnectionD13 connection =
new WebSocketConnectionD13(_future.getWebSocket(),
_endp,
_buffers,System.currentTimeMillis(),
_future.getMaxIdleTime(),
_future.getProtocol(),
null,
WebSocketConnectionD13.VERSION,
maskGen);
if (header.hasContent())
connection.fillBuffersFrom(header);
@ -406,7 +413,7 @@ public class WebSocketClientFactory extends AggregateLifeCycle
public void onInputShutdown() throws IOException
{
// TODO
_endp.close();
}
public boolean isIdle()
@ -426,5 +433,10 @@ public class WebSocketClientFactory extends AggregateLifeCycle
else
_future.handshakeFailed(new EOFException());
}
public String toString()
{
return "HS"+super.toString();
}
}
}

View File

@ -40,6 +40,30 @@ import org.eclipse.jetty.websocket.WebSocket.OnControl;
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
import org.eclipse.jetty.websocket.WebSocket.OnTextMessage;
/* ------------------------------------------------------------ */
/**
* <pre>
* 0 1 2 3
* 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
* +-+-+-+-+-------+-+-------------+-------------------------------+
* |F|R|R|R| opcode|M| Payload len | Extended payload length |
* |I|S|S|S| (4) |A| (7) | (16/64) |
* |N|V|V|V| |S| | (if payload len==126/127) |
* | |1|2|3| |K| | |
* +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
* | Extended payload length continued, if payload len == 127 |
* + - - - - - - - - - - - - - - - +-------------------------------+
* | |Masking-key, if MASK set to 1 |
* +-------------------------------+-------------------------------+
* | Masking-key (continued) | Payload Data |
* +-------------------------------- - - - - - - - - - - - - - - - +
* : Payload Data continued ... :
* + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
* | Payload Data continued ... |
* +---------------------------------------------------------------+
* </pre>
*/
public class WebSocketConnectionD13 extends AbstractConnection implements WebSocketConnection
{
private static final Logger LOG = Log.getLogger(WebSocketConnectionD13.class);
@ -225,12 +249,6 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
int filled=_parser.parseNext();
progress = flushed>0 || filled>0;
if (filled<0 || flushed<0)
{
_endp.close();
break;
}
}
}
catch(IOException e)
@ -267,7 +285,8 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
/* ------------------------------------------------------------ */
public void onInputShutdown() throws IOException
{
// TODO
if (!_closedIn)
_endp.close();
}
/* ------------------------------------------------------------ */
@ -309,11 +328,11 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
{
LOG.debug("ClosedIn {} {}",this,message);
final boolean close;
final boolean closed_out;
final boolean tell_app;
synchronized (this)
{
close=_closedOut;
closed_out=_closedOut;
_closedIn=true;
tell_app=_closeCode==0;
if (tell_app)
@ -330,17 +349,8 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
}
finally
{
try
{
if (close)
_endp.close();
else
closeOut(code,message);
}
catch(IOException e)
{
LOG.ignore(e);
}
if (!closed_out)
closeOut(code,message);
}
}
@ -349,13 +359,11 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
{
LOG.debug("ClosedOut {} {}",this,message);
final boolean close;
final boolean closed_out;
final boolean tell_app;
final boolean send_close;
synchronized (this)
{
close=_closedIn;
send_close=!_closedOut;
closed_out=_closedOut;
_closedOut=true;
tell_app=_closeCode==0;
if (tell_app)
@ -374,21 +382,16 @@ public class WebSocketConnectionD13 extends AbstractConnection implements WebSoc
{
try
{
if (send_close)
if (!closed_out)
{
if (code<=0)
code=WebSocketConnectionD13.CLOSE_NORMAL;
byte[] bytes = ("xx"+(message==null?"":message)).getBytes(StringUtil.__ISO_8859_1);
byte[] bytes = (message==null?"xx":("xx"+message)).getBytes(StringUtil.__ISO_8859_1);
bytes[0]=(byte)(code/0x100);
bytes[1]=(byte)(code%0x100);
_outbound.addFrame((byte)FLAG_FIN,WebSocketConnectionD13.OP_CLOSE,bytes,0,bytes.length);
_outbound.flush();
if (close)
_endp.shutdownOutput();
}
else if (close)
_endp.close();
}
catch(IOException e)
{

View File

@ -205,6 +205,7 @@ public class WebSocketFactory
extensions_requested.add(tok.nextToken());
}
final WebSocketConnection connection;
final List<Extension> extensions;
switch (draft)
@ -251,6 +252,7 @@ public class WebSocketFactory
connection.fillBuffersFrom(((HttpParser)http.getParser()).getBodyBuffer());
// Tell jetty about the new connection
LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),draft,protocol,connection);
request.setAttribute("org.eclipse.jetty.io.Connection", connection);
}

View File

@ -18,6 +18,7 @@ import java.io.IOException;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.TypeUtil;
/* ------------------------------------------------------------ */
@ -36,6 +37,7 @@ public class WebSocketGeneratorD13 implements WebSocketGenerator
private int _m;
private boolean _opsent;
private final MaskGen _maskGen;
private boolean _closed;
public WebSocketGeneratorD13(WebSocketBuffers buffers, EndPoint endp)
{
@ -60,6 +62,11 @@ public class WebSocketGeneratorD13 implements WebSocketGenerator
{
// System.err.printf("<< %s %s %s\n",TypeUtil.toHexString(flags),TypeUtil.toHexString(opcode),length);
if (_closed)
throw new EofException("Closed");
if (opcode==WebSocketConnectionD13.OP_CLOSE)
_closed=true;
boolean mask=_maskGen!=null;
if (_buffer==null)
@ -131,7 +138,6 @@ public class WebSocketGeneratorD13 implements WebSocketGenerator
_buffer.put(_mask);
}
// write payload
int remaining = payload;
while (remaining > 0)
@ -183,7 +189,12 @@ public class WebSocketGeneratorD13 implements WebSocketGenerator
throw new EofException();
if (_buffer!=null)
return _endp.flush(_buffer);
{
int flushed=_endp.flush(_buffer);
if (_closed&&_buffer.length()==0)
_endp.shutdownOutput();
return flushed;
}
return 0;
}

View File

@ -18,6 +18,7 @@ import java.io.IOException;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
@ -34,7 +35,7 @@ public class WebSocketParserD13 implements WebSocketParser
public enum State {
START(0), OPCODE(1), LENGTH_7(1), LENGTH_16(2), LENGTH_63(8), MASK(4), PAYLOAD(0), DATA(0), SKIP(1);
START(0), OPCODE(1), LENGTH_7(1), LENGTH_16(2), LENGTH_63(8), MASK(4), PAYLOAD(0), DATA(0), SKIP(1), SEEK_EOF(1);
int _needs;
@ -125,11 +126,12 @@ public class WebSocketParserD13 implements WebSocketParser
{
if (_buffer==null)
_buffer=_buffers.getBuffer();
int total_filled=0;
int events=0;
boolean progress=false;
int filled=-1;
// Loop until a datagram call back or can't fill anymore
while(true)
while(!progress && (!_endp.isInputShutdown()||_endp.isBufferingInput()||_buffer.length()>0))
{
int available=_buffer.length();
@ -158,35 +160,38 @@ public class WebSocketParserD13 implements WebSocketParser
}
// System.err.printf("%s %s %s >>\n",TypeUtil.toHexString(_flags),TypeUtil.toHexString(_opcode),data.length());
events++;
_bytesNeeded-=data.length();
progress=true;
_handler.onFrame((byte)(_flags&(0xff^WebSocketConnectionD13.FLAG_FIN)), _opcode, data);
_opcode=WebSocketConnectionD13.OP_CONTINUATION;
}
if (_buffer.space() == 0)
throw new IllegalStateException("FULL: "+_state+" "+_bytesNeeded+">"+_buffer.capacity());
throw new IllegalStateException("FULL: "+_state+" "+_bytesNeeded+">"+_buffer.capacity());
}
// catch IOExceptions (probably EOF) and try to parse what we have
try
{
int filled=_endp.isOpen()?_endp.fill(_buffer):-1;
if (filled<=0)
return (total_filled+events)>0?(total_filled+events):filled;
total_filled+=filled;
filled=_endp.isInputShutdown()?-1:_endp.fill(_buffer);
available=_buffer.length();
// System.err.printf(">> filled %d/%d%n",filled,available);
if (filled<=0)
break;
}
catch(IOException e)
{
LOG.debug(e);
return (total_filled+events)>0?(total_filled+events):-1;
filled=-1;
break;
}
}
// Did we get enough?
if (available<(_state==State.SKIP?1:_bytesNeeded))
break;
// if we are here, then we have sufficient bytes to process the current state.
// Parse the buffer byte by byte (unless it is STATE_DATA)
byte b;
while (_state!=State.DATA && available>=(_state==State.SKIP?1:_bytesNeeded))
@ -195,7 +200,7 @@ public class WebSocketParserD13 implements WebSocketParser
{
case START:
_skip=false;
_state=State.OPCODE;
_state=_opcode==WebSocketConnectionD13.OP_CLOSE?State.SEEK_EOF:State.OPCODE;
_bytesNeeded=_state.getNeeds();
continue;
@ -207,9 +212,9 @@ public class WebSocketParserD13 implements WebSocketParser
if (WebSocketConnectionD13.isControlFrame(_opcode)&&!WebSocketConnectionD13.isLastFrame(_flags))
{
events++;
LOG.warn("Fragmented Control from "+_endp);
_handler.close(WebSocketConnectionD13.CLOSE_PROTOCOL,"Fragmented control");
progress=true;
_skip=true;
}
@ -249,7 +254,7 @@ public class WebSocketParserD13 implements WebSocketParser
{
if (_length>_buffer.capacity() && !_fragmentFrames)
{
events++;
progress=true;
_handler.close(WebSocketConnectionD13.CLOSE_POLICY_VIOLATION,"frame size "+_length+">"+_buffer.capacity());
_skip=true;
}
@ -268,7 +273,7 @@ public class WebSocketParserD13 implements WebSocketParser
_bytesNeeded=(int)_length;
if (_length>=_buffer.capacity() && !_fragmentFrames)
{
events++;
progress=true;
_handler.close(WebSocketConnectionD13.CLOSE_POLICY_VIOLATION,"frame size "+_length+">"+_buffer.capacity());
_skip=true;
}
@ -296,12 +301,19 @@ public class WebSocketParserD13 implements WebSocketParser
case SKIP:
int skip=Math.min(available,_bytesNeeded);
progress=true;
_buffer.skip(skip);
available-=skip;
_bytesNeeded-=skip;
if (_bytesNeeded==0)
_state=State.START;
break;
case SEEK_EOF:
progress=true;
_buffer.skip(available);
available=0;
break;
}
}
@ -311,7 +323,7 @@ public class WebSocketParserD13 implements WebSocketParser
{
_buffer.skip(_bytesNeeded);
_state=State.START;
events++;
progress=true;
_handler.close(WebSocketConnectionD13.CLOSE_PROTOCOL,"Not masked");
}
else
@ -328,15 +340,18 @@ public class WebSocketParserD13 implements WebSocketParser
}
// System.err.printf("%s %s %s >>\n",TypeUtil.toHexString(_flags),TypeUtil.toHexString(_opcode),data.length());
events++;
progress=true;
_handler.onFrame(_flags, _opcode, data);
_bytesNeeded=0;
_state=State.START;
}
return total_filled+events;
break;
}
}
return progress?1:filled;
}
/* ------------------------------------------------------------ */

View File

@ -1,9 +1,11 @@
package org.eclipse.jetty.websocket;
import static junit.framework.Assert.assertEquals;
import static junit.framework.Assert.assertTrue;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.StringUtil;
import org.junit.Before;
import org.junit.Test;
@ -196,5 +198,41 @@ public class WebSocketGeneratorD13Test
for (int i=0;i<b.length;i++)
assertEquals('0'+(i%10),0xff&getMasked());
}
@Test
public void testClose() throws Exception
{
_generator = new WebSocketGeneratorD13(_buffers, _endPoint,null);
byte[] data = "xxGame Over".getBytes(StringUtil.__UTF8);
data[0]=(byte)(1000/0x100);
data[1]=(byte)(1000%0x100);
_generator.addFrame((byte)0x8,(byte)0x08,data,0,data.length);
_generator.flush();
assertEquals((byte)0x88,_out.get());
assertEquals(11,0xff&_out.get());
_out.get();
_out.get();
assertEquals('G',_out.get());
assertEquals('a',_out.get());
assertEquals('m',_out.get());
assertEquals('e',_out.get());
assertEquals(' ',_out.get());
assertEquals('O',_out.get());
assertEquals('v',_out.get());
assertEquals('e',_out.get());
assertEquals('r',_out.get());
try
{
_generator.addFrame((byte)0x8,(byte)0x04,data,0,data.length);
assertTrue(false);
}
catch(EofException e)
{
}
assertTrue(_endPoint.isOutputShutdown());
}
}

View File

@ -65,7 +65,7 @@ public class WebSocketMessageD13Test
}
};
wsHandler.getWebSocketFactory().setBufferSize(8192);
wsHandler.getWebSocketFactory().setMaxIdleTime(1000);
wsHandler.getWebSocketFactory().setMaxIdleTime(1000);
wsHandler.setHandler(new DefaultHandler());
__server.setHandler(wsHandler);
__server.start();
@ -390,7 +390,7 @@ public class WebSocketMessageD13Test
output.write(bytes[i]^0xff);
output.flush();
// Make sure the read times out if there are problems with the implementation
socket.setSoTimeout(1000);
socket.setSoTimeout(1000);
InputStream input = socket.getInputStream();

View File

@ -22,6 +22,7 @@ import org.junit.Test;
*/
public class WebSocketParserD13Test
{
private ByteArrayEndPoint _endPoint;
private MaskedByteArrayBuffer _in;
private Handler _handler;
private WebSocketParserD13 _parser;
@ -84,14 +85,14 @@ public class WebSocketParserD13Test
public void setUp() throws Exception
{
WebSocketBuffers buffers = new WebSocketBuffers(1024);
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
endPoint.setNonBlocking(true);
_endPoint = new ByteArrayEndPoint();
_endPoint.setNonBlocking(true);
_handler = new Handler();
_parser=new WebSocketParserD13(buffers, endPoint,_handler,true);
_parser=new WebSocketParserD13(buffers, _endPoint,_handler,true);
_parser.setFakeFragments(false);
_in = new MaskedByteArrayBuffer();
endPoint.setIn(_in);
_endPoint.setIn(_in);
}
@Test
@ -128,7 +129,7 @@ public class WebSocketParserD13Test
int progress =_parser.parseNext();
assertEquals(18,progress);
assertTrue(progress>0);
assertEquals("Hello World",_handler._data.get(0));
assertEquals(0x8,_handler._flags);
assertEquals(0x1,_handler._opcode);
@ -150,7 +151,7 @@ public class WebSocketParserD13Test
int progress =_parser.parseNext();
assertEquals(bytes.length+7,progress);
assertTrue(progress>0);
assertEquals(string,_handler._data.get(0));
assertEquals(0x8,_handler._flags);
assertEquals(0x1,_handler._opcode);
@ -178,7 +179,7 @@ public class WebSocketParserD13Test
int progress =_parser.parseNext();
assertEquals(bytes.length+9,progress);
assertTrue(progress>0);
assertEquals(string,_handler._data.get(0));
assertEquals(0x8,_handler._flags);
assertEquals(0x1,_handler._opcode);
@ -219,7 +220,7 @@ public class WebSocketParserD13Test
int progress =parser.parseNext();
parser.returnBuffer();
assertEquals(bytes.length+11,progress);
assertTrue(progress>0);
assertEquals(string,_handler._data.get(0));
assertTrue(parser.isBufferEmpty());
assertTrue(parser.getBuffer()==null);
@ -239,7 +240,7 @@ public class WebSocketParserD13Test
int progress =_parser.parseNext();
assertEquals(24,progress);
assertTrue(progress>0);
assertEquals(0,_handler._data.size());
assertFalse(_parser.isBufferEmpty());
assertFalse(_parser.getBuffer()==null);
@ -247,7 +248,7 @@ public class WebSocketParserD13Test
progress =_parser.parseNext();
_parser.returnBuffer();
assertEquals(1,progress);
assertTrue(progress>0);
assertEquals("Hello World",_handler._data.get(0));
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
@ -268,18 +269,18 @@ public class WebSocketParserD13Test
int progress =_parser.parseNext();
assertTrue(progress>0);
assertEquals(WebSocketConnectionD13.CLOSE_POLICY_VIOLATION,_handler._code);
for (int i=0;i<2048;i++)
_in.put((byte)'a');
progress =_parser.parseNext();
assertEquals(2048,progress);
assertTrue(progress>0);
assertEquals(0,_handler._data.size());
assertEquals(0,_handler._utf8.length());
_handler._code=0;
_handler._message=null;
_in.putUnmasked((byte)0x81);
_in.putUnmasked((byte)0xFE);
@ -291,8 +292,7 @@ public class WebSocketParserD13Test
progress =_parser.parseNext();
assertTrue(progress>0);
assertEquals(1,_handler._data.size());
assertEquals(1024,_handler._data.get(0).length());
assertEquals(0,_handler._data.size());
}
@Test
@ -323,6 +323,43 @@ public class WebSocketParserD13Test
assertEquals(('a'+i%26),mesg.charAt(i));
}
@Test
public void testClose() throws Exception
{
String string = "Game Over";
byte[] bytes = string.getBytes("UTF-8");
_in.putUnmasked((byte)(0x80|0x08));
_in.putUnmasked((byte)(0x80|(2+bytes.length)));
_in.sendMask();
_in.put((byte)(1000/0x100));
_in.put((byte)(1000%0x100));
_in.put(bytes);
int progress =_parser.parseNext();
assertTrue(progress>0);
assertEquals(string,_handler._data.get(0).substring(2));
assertEquals(0x8,_handler._flags);
assertEquals(0x8,_handler._opcode);
_parser.returnBuffer();
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
_in.clear();
_in.put(bytes);
_endPoint.setIn(_in);
progress =_parser.parseNext();
assertTrue(progress>0);
_endPoint.shutdownInput();
progress =_parser.parseNext();
assertEquals(-1,progress);
}
private class Handler implements WebSocketParser.FrameHandler
{
Utf8StringBuilder _utf8 = new Utf8StringBuilder();
@ -330,7 +367,6 @@ public class WebSocketParserD13Test
private byte _flags;
private byte _opcode;
int _code;
String _message;
int _frames;
public void onFrame(byte flags, byte opcode, Buffer buffer)
@ -353,7 +389,6 @@ public class WebSocketParserD13Test
public void close(int code,String message)
{
_code=code;
_message=message;
}
}
}