342700 refine websocket API for anticipated changes

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@3013 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Greg Wilkins 2011-04-13 11:05:35 +00:00
parent 07462afa38
commit 58652d8f09
31 changed files with 722 additions and 1897 deletions

View File

@ -1,5 +1,6 @@
jetty-7.4.0-SNAPSHOT jetty-7.4.0-SNAPSHOT
+ 342504 Scanner Listener + 342504 Scanner Listener
+ 342700 refine websocket API for anticipated changes
+ JETTY-1362 Set root cause of UnavailableException + JETTY-1362 Set root cause of UnavailableException
+ Various test harness cleanups to avoid random failures + Various test harness cleanups to avoid random failures

View File

@ -29,6 +29,7 @@ import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpSchemes; import org.eclipse.jetty.http.HttpSchemes;
import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.http.HttpVersions; import org.eclipse.jetty.http.HttpVersions;
import org.eclipse.jetty.io.AbstractConnection;
import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers; import org.eclipse.jetty.io.Buffers;
@ -44,7 +45,7 @@ import org.eclipse.jetty.util.thread.Timeout;
* *
* @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $ * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $
*/ */
public class HttpConnection /* extends AbstractConnection */ implements Connection public class HttpConnection extends AbstractConnection
{ {
private HttpDestination _destination; private HttpDestination _destination;
private HttpGenerator _generator; private HttpGenerator _generator;
@ -75,8 +76,7 @@ public class HttpConnection /* extends AbstractConnection */ implements Connecti
HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp) HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp)
{ {
_endp=endp; super(endp);
_timeStamp = System.currentTimeMillis();
_generator = new HttpGenerator(requestBuffers,endp); _generator = new HttpGenerator(requestBuffers,endp);
_parser = new HttpParser(responseBuffers,endp,new Handler()); _parser = new HttpParser(responseBuffers,endp,new Handler());
@ -431,7 +431,7 @@ public class HttpConnection /* extends AbstractConnection */ implements Connecti
if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp instanceof AsyncEndPoint) if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp instanceof AsyncEndPoint)
{ {
// Assume we are write blocked! // Assume we are write blocked!
((AsyncEndPoint)_endp).setWritable(false); ((AsyncEndPoint)_endp).scheduleWrite();
} }
} }
@ -727,18 +727,4 @@ public class HttpConnection /* extends AbstractConnection */ implements Connecti
} }
} }
} }
// TODO remove and use AbstractConnection for 7.4
private final long _timeStamp;
protected final EndPoint _endp;
public long getTimeStamp()
{
return _timeStamp;
}
public EndPoint getEndPoint()
{
return _endp;
}
} }

View File

@ -77,11 +77,11 @@ public class WebSocketUpgradeTest extends TestCase
{ {
Connection _connection; Connection _connection;
public void onDisconnect(int closeCode, String message) public void onClose(int closeCode, String message)
{ {
} }
public void onConnect(Connection connection) public void onOpen(Connection connection)
{ {
_connection=connection; _connection=connection;
_results.add("clientWS.onConnect"); _results.add("clientWS.onConnect");
@ -118,11 +118,11 @@ public class WebSocketUpgradeTest extends TestCase
protected Connection onSwitchProtocol(EndPoint endp) throws IOException protected Connection onSwitchProtocol(EndPoint endp) throws IOException
{ {
waitFor(3); waitFor(3);
WebSocketConnectionD00 connection = new WebSocketConnectionD00(clientWS,endp,new WebSocketBuffers(4096),System.currentTimeMillis(),1000,"",0); WebSocketConnectionD00 connection = new WebSocketConnectionD00(clientWS,endp,new WebSocketBuffers(4096),System.currentTimeMillis(),1000,"");
_results.add("onSwitchProtocol"); _results.add("onSwitchProtocol");
_results.add(connection); _results.add(connection);
clientWS.onConnect(connection); clientWS.onOpen(connection);
return connection; return connection;
} }
@ -215,7 +215,7 @@ public class WebSocketUpgradeTest extends TestCase
{ {
Connection _connection; Connection _connection;
public void onConnect(Connection connection) public void onOpen(Connection connection)
{ {
_connection=connection; _connection=connection;
_webSockets.add(this); _webSockets.add(this);
@ -229,7 +229,7 @@ public class WebSocketUpgradeTest extends TestCase
_results.add(data); _results.add(data);
} }
public void onDisconnect(int code, String message) public void onClose(int code, String message)
{ {
_results.add("onDisconnect"); _results.add("onDisconnect");
_webSockets.remove(this); _webSockets.remove(this);

View File

@ -5,7 +5,7 @@ import java.io.IOException;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
public abstract class AbstractConnection implements Connection, Idleable public abstract class AbstractConnection implements Connection
{ {
private final long _timeStamp; private final long _timeStamp;
protected final EndPoint _endp; protected final EndPoint _endp;
@ -48,4 +48,4 @@ public abstract class AbstractConnection implements Connection, Idleable
{ {
return super.toString()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort(); return super.toString()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
} }
} }

View File

@ -28,17 +28,6 @@ public interface AsyncEndPoint extends EndPoint
*/ */
public boolean isReadyForDispatch(); public boolean isReadyForDispatch();
/* ------------------------------------------------------------ */
/** Set the writable status.
* The writable status is considered next time the async scheduling
* is calculated.
*
* @param writable true if the endpoint is known to be writable or false
* if it is known to not be writable.
*/
public void setWritable(boolean writable);
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** Schedule a write dispatch. /** Schedule a write dispatch.
* Set the endpoint to not be writable and schedule a dispatch when * Set the endpoint to not be writable and schedule a dispatch when

View File

@ -51,4 +51,9 @@ public interface Connection
* Called when the connection is closed * Called when the connection is closed
*/ */
void closed(); void closed();
/**
* Called when the connection idle timeout expires
*/
void idleExpired();
} }

View File

@ -1,27 +0,0 @@
// ========================================================================
// Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.io;
/* ------------------------------------------------------------ */
/** Idleable.
* @deprecated Merge this into Connection at the next point release
*/
public interface Idleable
{
/**
* Called when the connection idle timeout expires
*/
void idleExpired();
}

View File

@ -24,7 +24,6 @@ import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ConnectedEndPoint; import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.Idleable;
import org.eclipse.jetty.io.nio.SelectorManager.SelectSet; import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -52,7 +51,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
private boolean _writeBlocked; private boolean _writeBlocked;
private boolean _open; private boolean _open;
private volatile long _idleTimestamp; private volatile long _idleTimestamp;
private boolean _changing=false;
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime)
@ -199,7 +197,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{ {
_dispatched = false; _dispatched = false;
Log.warn("Dispatched Failed! "+this+" to "+_manager); Log.warn("Dispatched Failed! "+this+" to "+_manager);
new Throwable().printStackTrace();
updateKey(); updateKey();
} }
} }
@ -250,21 +247,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
protected void idleExpired() protected void idleExpired()
{ {
if (_connection instanceof Idleable) _connection.idleExpired();
{
((Idleable)_connection).idleExpired();
}
else
{
try
{
shutdownOutput();
}
catch(IOException e)
{
Log.ignore(e);
}
}
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -274,14 +257,19 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
{ {
int l = super.flush(header, buffer, trailer); int l = super.flush(header, buffer, trailer);
if (!(_writable=l!=0))
// If there was something to write and it wasn't written, then we are not writable.
if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent()))
{ {
synchronized (this) synchronized (this)
{ {
_writable=false;
if (!_dispatched) if (!_dispatched)
updateKey(); updateKey();
} }
} }
else
_writable=true;
return l; return l;
} }
@ -292,14 +280,20 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
public int flush(Buffer buffer) throws IOException public int flush(Buffer buffer) throws IOException
{ {
int l = super.flush(buffer); int l = super.flush(buffer);
if (!(_writable=l!=0))
// If there was something to write and it wasn't written, then we are not writable.
if (l==0 && buffer!=null && buffer.hasContent())
{ {
synchronized (this) synchronized (this)
{ {
_writable=false;
if (!_dispatched) if (!_dispatched)
updateKey(); updateKey();
} }
} }
else
_writable=true;
return l; return l;
} }
@ -390,13 +384,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
} }
return true; return true;
} }
/* ------------------------------------------------------------ */
public void setWritable(boolean writable)
{
_writable=writable;
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void scheduleWrite() public void scheduleWrite()
{ {
@ -433,7 +421,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
if(_interestOps == ops && getChannel().isOpen()) if(_interestOps == ops && getChannel().isOpen())
return; return;
_changing=true;
} }
_selectSet.addChange(this); _selectSet.addChange(this);
_selectSet.wakeup(); _selectSet.wakeup();
@ -447,7 +434,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
{ {
synchronized (this) synchronized (this)
{ {
_changing=false;
if (getChannel().isOpen()) if (getChannel().isOpen())
{ {
if (_interestOps>0) if (_interestOps>0)
@ -529,6 +515,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo
final Connection next = _connection.handle(); final Connection next = _connection.handle();
if (next!=_connection) if (next!=_connection)
{ {
Log.debug("{} replaced {}",next,_connection);
_connection=next; _connection=next;
continue; continue;
} }

View File

@ -516,7 +516,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
Object att = key.attachment(); Object att = key.attachment();
if (att instanceof SelectChannelEndPoint) if (att instanceof SelectChannelEndPoint)
{ {
((SelectChannelEndPoint)att).schedule(); if (key.isReadable()||key.isWritable())
((SelectChannelEndPoint)att).schedule();
} }
else if (key.isConnectable()) else if (key.isConnectable())
{ {
@ -553,7 +554,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
SelectChannelEndPoint endpoint = createEndPoint(channel,key); SelectChannelEndPoint endpoint = createEndPoint(channel,key);
key.attach(endpoint); key.attach(endpoint);
if (key.isReadable()) if (key.isReadable())
endpoint.schedule(); endpoint.schedule();
} }
key = null; key = null;
} }
@ -837,7 +838,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa
for (int i=0;i<100 && _selecting!=null;i++) for (int i=0;i<100 && _selecting!=null;i++)
{ {
wakeup(); wakeup();
Thread.sleep(1); Thread.sleep(10);
} }
} }
catch(Exception e) catch(Exception e)

View File

@ -954,15 +954,6 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint
return _engine; return _engine;
} }
/* ------------------------------------------------------------ */
@Override
public void setWritable(boolean writable)
{
// only set !writable if we are not waiting for input
if (writable || !HandshakeStatus.NEED_UNWRAP.equals(_engine.getHandshakeStatus()) || super.isBufferingOutput())
super.setWritable(writable);
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@Override @Override
public void scheduleWrite() public void scheduleWrite()

View File

@ -503,7 +503,7 @@ public class HttpConnection extends AbstractConnection implements Connection
more_in_buffer=false; more_in_buffer=false;
} }
else if (_generator.isCommitted() && !_generator.isComplete() && _endp instanceof AsyncEndPoint) else if (_generator.isCommitted() && !_generator.isComplete() && _endp instanceof AsyncEndPoint)
((AsyncEndPoint)_endp).setWritable(false); ((AsyncEndPoint)_endp).scheduleWrite();
} }
} }
} }
@ -802,6 +802,7 @@ public class HttpConnection extends AbstractConnection implements Connection
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void closed() public void closed()
{ {
Log.debug("closed {}",this);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */

View File

@ -670,6 +670,19 @@ public class ConnectHandler extends HandlerWrapper
writeData(); writeData();
_endPoint.shutdownOutput(); _endPoint.shutdownOutput();
} }
public void idleExpired()
{
try
{
shutdownOutput();
}
catch(Exception e)
{
Log.debug(e);
close();
}
}
} }
public class ClientToProxyConnection implements Connection public class ClientToProxyConnection implements Connection
@ -819,6 +832,19 @@ public class ConnectHandler extends HandlerWrapper
{ {
_endPoint.shutdownOutput(); _endPoint.shutdownOutput();
} }
public void idleExpired()
{
try
{
shutdownOutput();
}
catch(Exception e)
{
Log.debug(e);
close();
}
}
} }
/** /**

View File

@ -28,8 +28,10 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint; import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.io.nio.SelectorManager; import org.eclipse.jetty.io.nio.SelectorManager;
import org.eclipse.jetty.io.nio.SelectorManager.SelectSet; import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.component.AggregateLifeCycle;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.thread.Timeout.Task; import org.eclipse.jetty.util.thread.Timeout.Task;
@ -304,6 +306,7 @@ public class SelectChannelConnector extends AbstractNIOConnector
return new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime); return new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime);
} }
/* ------------------------------------------------------------------------------- */
protected void endPointClosed(SelectChannelEndPoint endpoint) protected void endPointClosed(SelectChannelEndPoint endpoint)
{ {
connectionClosed(endpoint.getConnection()); connectionClosed(endpoint.getConnection());
@ -312,22 +315,7 @@ public class SelectChannelConnector extends AbstractNIOConnector
/* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */
protected Connection newConnection(SocketChannel channel,final SelectChannelEndPoint endpoint) protected Connection newConnection(SocketChannel channel,final SelectChannelEndPoint endpoint)
{ {
return new HttpConnection(SelectChannelConnector.this,endpoint,getServer()) return new SelectChannelHttpConnection(SelectChannelConnector.this,endpoint,getServer(),endpoint);
{
/* ------------------------------------------------------------ */
@Override
public void cancelTimeout(Task task)
{
endpoint.getSelectSet().cancelTimeout(task);
}
/* ------------------------------------------------------------ */
@Override
public void scheduleTimeout(Task task, long timeoutMs)
{
endpoint.getSelectSet().scheduleTimeout(task,timeoutMs);
}
};
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -341,6 +329,34 @@ public class SelectChannelConnector extends AbstractNIOConnector
AggregateLifeCycle.dump(out,indent,Arrays.asList(new Object[]{_acceptChannel,_acceptChannel.isOpen()?"OPEN":"CLOSED",_manager})); AggregateLifeCycle.dump(out,indent,Arrays.asList(new Object[]{_acceptChannel,_acceptChannel.isOpen()?"OPEN":"CLOSED",_manager}));
} }
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private class SelectChannelHttpConnection extends HttpConnection
{
private final SelectChannelEndPoint _endpoint;
private SelectChannelHttpConnection(Connector connector, EndPoint endpoint, Server server, SelectChannelEndPoint endpoint2)
{
super(connector,endpoint,server);
_endpoint = endpoint2;
}
/* ------------------------------------------------------------ */
@Override
public void cancelTimeout(Task task)
{
_endpoint.getSelectSet().cancelTimeout(task);
}
/* ------------------------------------------------------------ */
@Override
public void scheduleTimeout(Task task, long timeoutMs)
{
_endpoint.getSelectSet().scheduleTimeout(task,timeoutMs);
}
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */

View File

@ -21,6 +21,9 @@ import org.eclipse.jetty.util.log.Log;
/** /**
* @version $Revision$ $Date$ * @version $Revision$ $Date$
*
* This is not a general purpose websocket client.
* It's only for testing the websocket server and is hardwired to a specific draft version of the protocol.
*/ */
public class TestClient public class TestClient
{ {
@ -66,6 +69,11 @@ public class TestClient
_socket.close(); _socket.close();
return; return;
} }
else if (opcode == WebSocketConnectionD06.OP_PING)
{
_generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length(),_socket.getSoTimeout());
_generator.flush(_socket.getSoTimeout());
}
_messageBytes+=buffer.length(); _messageBytes+=buffer.length();

View File

@ -11,7 +11,6 @@ import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.websocket.WebSocket.Connection;
public class TestServer extends Server public class TestServer extends Server
{ {
@ -62,7 +61,7 @@ public class TestServer extends Server
_rHandler=new ResourceHandler(); _rHandler=new ResourceHandler();
_rHandler.setDirectoriesListed(true); _rHandler.setDirectoriesListed(true);
_rHandler.setResourceBase("."); _rHandler.setResourceBase("src/test/webapp");
_wsHandler.setHandler(_rHandler); _wsHandler.setHandler(_rHandler);
} }
@ -95,21 +94,27 @@ public class TestServer extends Server
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
class TestWebSocket implements WebSocket, WebSocket.OnFrame, WebSocket.OnBinaryMessage, WebSocket.OnTextMessage, WebSocket.OnControl class TestWebSocket implements WebSocket, WebSocket.OnFrame, WebSocket.OnBinaryMessage, WebSocket.OnTextMessage, WebSocket.OnControl
{ {
protected Connection _connection; protected FrameConnection _connection;
public Connection getOutbound() public FrameConnection getConnection()
{ {
return _connection; return _connection;
} }
public void onConnect(Connection connection) public void onOpen(Connection connection)
{ {
_connection = connection;
if (_verbose) if (_verbose)
System.err.printf("%s#onConnect %s\n",this.getClass().getSimpleName(),connection); System.err.printf("%s#onOpen %s\n",this.getClass().getSimpleName(),connection);
}
public void onHandshake(FrameConnection connection)
{
if (_verbose)
System.err.printf("%s#onHandshake %s %s\n",this.getClass().getSimpleName(),connection,connection.getClass().getSimpleName());
_connection = connection;
} }
public void onDisconnect(int code,String message) public void onClose(int code,String message)
{ {
if (_verbose) if (_verbose)
System.err.printf("%s#onDisonnect %d %s\n",this.getClass().getSimpleName(),code,message); System.err.printf("%s#onDisonnect %d %s\n",this.getClass().getSimpleName(),code,message);
@ -147,9 +152,9 @@ public class TestServer extends Server
class TestEchoWebSocket extends TestWebSocket class TestEchoWebSocket extends TestWebSocket
{ {
@Override @Override
public void onConnect(Connection connection) public void onOpen(Connection connection)
{ {
super.onConnect(connection); super.onOpen(connection);
connection.setMaxTextMessageSize(-1); connection.setMaxTextMessageSize(-1);
connection.setMaxBinaryMessageSize(-1); connection.setMaxBinaryMessageSize(-1);
} }
@ -160,16 +165,8 @@ public class TestServer extends Server
super.onFrame(flags,opcode,data,offset,length); super.onFrame(flags,opcode,data,offset,length);
try try
{ {
switch(opcode) if (!getConnection().isControl(opcode))
{ getConnection().sendFrame(flags,opcode,data,offset,length); }
case WebSocketConnectionD06.OP_CLOSE:
case WebSocketConnectionD06.OP_PING:
case WebSocketConnectionD06.OP_PONG:
break;
default:
getOutbound().sendFrame(flags,opcode,data,offset,length);
}
}
catch (IOException e) catch (IOException e)
{ {
e.printStackTrace(); e.printStackTrace();
@ -184,16 +181,16 @@ public class TestServer extends Server
class TestEchoBroadcastWebSocket extends TestWebSocket class TestEchoBroadcastWebSocket extends TestWebSocket
{ {
@Override @Override
public void onConnect(Connection connection) public void onOpen(Connection connection)
{ {
super.onConnect(connection); super.onOpen(connection);
_broadcast.add(this); _broadcast.add(this);
} }
@Override @Override
public void onDisconnect(int code,String message) public void onClose(int code,String message)
{ {
super.onDisconnect(code,message); super.onClose(code,message);
_broadcast.remove(this); _broadcast.remove(this);
} }
@ -205,7 +202,7 @@ public class TestServer extends Server
{ {
try try
{ {
ws.getOutbound().sendMessage(data,offset,length); ws.getConnection().sendMessage(data,offset,length);
} }
catch (IOException e) catch (IOException e)
{ {
@ -223,7 +220,7 @@ public class TestServer extends Server
{ {
try try
{ {
ws.getOutbound().sendMessage(data); ws.getConnection().sendMessage(data);
} }
catch (IOException e) catch (IOException e)
{ {
@ -240,9 +237,9 @@ public class TestServer extends Server
{ {
@Override @Override
public void onConnect(Connection connection) public void onOpen(Connection connection)
{ {
super.onConnect(connection); super.onOpen(connection);
connection.setMaxTextMessageSize(64*1024); connection.setMaxTextMessageSize(64*1024);
connection.setMaxBinaryMessageSize(64*1024); connection.setMaxBinaryMessageSize(64*1024);
} }
@ -253,7 +250,7 @@ public class TestServer extends Server
super.onMessage(data,offset,length); super.onMessage(data,offset,length);
try try
{ {
getOutbound().sendMessage(data,offset,length); getConnection().sendMessage(data,offset,length);
} }
catch (IOException e) catch (IOException e)
{ {
@ -267,7 +264,7 @@ public class TestServer extends Server
super.onMessage(data); super.onMessage(data);
try try
{ {
getOutbound().sendMessage(data); getConnection().sendMessage(data);
} }
catch (IOException e) catch (IOException e)
{ {
@ -281,9 +278,9 @@ public class TestServer extends Server
class TestEchoFragmentWebSocket extends TestWebSocket class TestEchoFragmentWebSocket extends TestWebSocket
{ {
@Override @Override
public void onConnect(Connection connection) public void onOpen(Connection connection)
{ {
super.onConnect(connection); super.onOpen(connection);
connection.setMaxTextMessageSize(64*1024); connection.setMaxTextMessageSize(64*1024);
connection.setMaxBinaryMessageSize(64*1024); connection.setMaxBinaryMessageSize(64*1024);
} }
@ -294,8 +291,8 @@ public class TestServer extends Server
super.onMessage(data,offset,length); super.onMessage(data,offset,length);
try try
{ {
getOutbound().sendFrame((byte)0x0,WebSocketConnectionD06.OP_BINARY,data,offset,length/2); getConnection().sendFrame((byte)0x0,getConnection().binaryOpcode(),data,offset,length/2);
getOutbound().sendFrame((byte)0x8,WebSocketConnectionD06.OP_BINARY,data,offset+length/2,length-length/2); getConnection().sendFrame((byte)0x8,getConnection().binaryOpcode(),data,offset+length/2,length-length/2);
} }
catch (IOException e) catch (IOException e)
{ {
@ -312,8 +309,8 @@ public class TestServer extends Server
byte[] data = message.getBytes(StringUtil.__UTF8); byte[] data = message.getBytes(StringUtil.__UTF8);
int offset=0; int offset=0;
int length=data.length; int length=data.length;
getOutbound().sendFrame((byte)0x0,WebSocketConnectionD06.OP_TEXT,data,offset,length/2); getConnection().sendFrame((byte)0x0,getConnection().textOpcode(),data,offset,length/2);
getOutbound().sendFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,offset+length/2,length-length/2); getConnection().sendFrame((byte)0x8,getConnection().textOpcode(),data,offset+length/2,length-length/2);
} }
catch (IOException e) catch (IOException e)
{ {
@ -327,7 +324,7 @@ public class TestServer extends Server
System.err.println("java -cp CLASSPATH "+TestServer.class+" [ OPTIONS ]"); System.err.println("java -cp CLASSPATH "+TestServer.class+" [ OPTIONS ]");
System.err.println(" -p|--port PORT (default 8080)"); System.err.println(" -p|--port PORT (default 8080)");
System.err.println(" -v|--verbose "); System.err.println(" -v|--verbose ");
System.err.println(" -d|--docroot file (default '.')"); System.err.println(" -d|--docroot file (default 'src/test/webapp')");
System.exit(1); System.exit(1);
} }
@ -337,7 +334,7 @@ public class TestServer extends Server
{ {
int port=8080; int port=8080;
boolean verbose=false; boolean verbose=false;
String docroot="."; String docroot="src/test/webapp";
for (int i=0;i<args.length;i++) for (int i=0;i<args.length;i++)
{ {

View File

@ -27,14 +27,14 @@ public interface WebSocket
* Called when a new websocket connection is accepted. * Called when a new websocket connection is accepted.
* @param connection The Connection object to use to send messages. * @param connection The Connection object to use to send messages.
*/ */
void onConnect(Connection connection); void onOpen(Connection connection);
/** /**
* Called when an established websocket connection closes * Called when an established websocket connection closes
* @param closeCode * @param closeCode
* @param message * @param message
*/ */
void onDisconnect(int closeCode, String message); void onClose(int closeCode, String message);
/** /**
* A nested WebSocket interface for receiving text messages * A nested WebSocket interface for receiving text messages
@ -95,34 +95,68 @@ public interface WebSocket
* @return true if this call has completely handled the frame and no further processing is needed (including aggregation and/or message delivery) * @return true if this call has completely handled the frame and no further processing is needed (including aggregation and/or message delivery)
*/ */
boolean onFrame(byte flags,byte opcode,byte[] data, int offset, int length); boolean onFrame(byte flags,byte opcode,byte[] data, int offset, int length);
void onHandshake(FrameConnection connection);
} }
/**
* A Connection interface is passed to a WebSocket instance via the {@link WebSocket#onOpen(Connection)} to
* give the application access to the specifics of the current connection. This includes methods
* for sending frames and messages as well as methods for interpreting the flags and opcodes of the connection.
*/
public interface Connection public interface Connection
{ {
String getProtocol(); String getProtocol();
void sendMessage(String data) throws IOException; void sendMessage(String data) throws IOException;
void sendMessage(byte[] data, int offset, int length) throws IOException; void sendMessage(byte[] data, int offset, int length) throws IOException;
void sendControl(byte control,byte[] data, int offset, int length) throws IOException; void disconnect();
void sendFrame(byte flags,byte opcode,byte[] data, int offset, int length) throws IOException;
void disconnect(int closeCode,String message);
boolean isOpen(); boolean isOpen();
boolean isMore(byte flags); /**
* @param size size<0 No aggregation of frames to messages, >=0 max size of text frame aggregation buffer in characters
*/
void setMaxTextMessageSize(int size); void setMaxTextMessageSize(int size);
/**
* @param size size<0 no aggregation of binary frames, >=0 size of binary frame aggregation buffer
*/
void setMaxBinaryMessageSize(int size); void setMaxBinaryMessageSize(int size);
/** /**
* Size in characters of the maximum text message to be received * Size in characters of the maximum text message to be received
* @return <0 No aggregation of frames to messages, >=0 max size of text frame aggregation buffer in characters * @return size <0 No aggregation of frames to messages, >=0 max size of text frame aggregation buffer in characters
*/ */
int getMaxTextMessageSize(); int getMaxTextMessageSize();
/** /**
* Size in bytes of the maximum binary message to be received * Size in bytes of the maximum binary message to be received
* @return <0 no aggregation of binary frames, >=0 size of binary frame aggregation buffer * @return size <0 no aggregation of binary frames, >=0 size of binary frame aggregation buffer
*/ */
int getMaxBinaryMessageSize(); int getMaxBinaryMessageSize();
} }
/**
* Frame Level Connection
* <p>The Connection interface at the level of sending/receiving frames rather than messages.
*
*/
public interface FrameConnection extends Connection
{
boolean isMessageComplete(byte flags);
void close(int closeCode,String message);
byte binaryOpcode();
byte textOpcode();
boolean isControl(byte opcode);
boolean isText(byte opcode);
boolean isBinary(byte opcode);
boolean isContinuation(byte opcode);
boolean isClose(byte opcode);
boolean isPing(byte opcode);
boolean isPong(byte opcode);
void sendControl(byte control,byte[] data, int offset, int length) throws IOException;
void sendFrame(byte flags,byte opcode,byte[] data, int offset, int length) throws IOException;
}
} }

View File

@ -31,13 +31,13 @@ import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.websocket.WebSocket.OnFrame;
public class WebSocketConnectionD00 extends AbstractConnection implements WebSocketConnection, WebSocket.Connection public class WebSocketConnectionD00 extends AbstractConnection implements WebSocketConnection, WebSocket.FrameConnection
{ {
public final static byte LENGTH_FRAME=(byte)0x80; public final static byte LENGTH_FRAME=(byte)0x80;
public final static byte SENTINEL_FRAME=(byte)0x00; public final static byte SENTINEL_FRAME=(byte)0x00;
final IdleCheck _idle; final IdleCheck _idle;
final WebSocketParser _parser; final WebSocketParser _parser;
final WebSocketGenerator _generator; final WebSocketGenerator _generator;
@ -47,11 +47,10 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
String _key2; String _key2;
ByteArrayBuffer _hixieBytes; ByteArrayBuffer _hixieBytes;
public WebSocketConnectionD00(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, int draft) public WebSocketConnectionD00(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol)
throws IOException throws IOException
{ {
super(endpoint,timestamp); super(endpoint,timestamp);
// TODO - can we use the endpoint idle mechanism?
if (endpoint instanceof AsyncEndPoint) if (endpoint instanceof AsyncEndPoint)
((AsyncEndPoint)endpoint).cancelIdle(); ((AsyncEndPoint)endpoint).cancelIdle();
@ -60,19 +59,9 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
_websocket = websocket; _websocket = websocket;
_protocol=protocol; _protocol=protocol;
// Select the parser/generators to use _generator = new WebSocketGeneratorD00(buffers, _endp);
switch(draft) _parser = new WebSocketParserD00(buffers, endpoint, new FrameHandlerD0(_websocket));
{
case 1:
_generator = new WebSocketGeneratorD01(buffers, _endp);
_parser = new WebSocketParserD01(buffers, endpoint, new FrameHandlerD1(_websocket));
break;
default:
_generator = new WebSocketGeneratorD00(buffers, _endp);
_parser = new WebSocketParserD00(buffers, endpoint, new FrameHandlerD0(_websocket));
}
// TODO should these be AsyncEndPoint checks/calls?
if (_endp instanceof SelectChannelEndPoint) if (_endp instanceof SelectChannelEndPoint)
{ {
final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp; final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp;
@ -95,7 +84,8 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
}; };
} }
} }
/* ------------------------------------------------------------ */
public void setHixieKeys(String key1,String key2) public void setHixieKeys(String key1,String key2)
{ {
_key1=key1; _key1=key1;
@ -103,6 +93,7 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
_hixieBytes=new IndirectNIOBuffer(16); _hixieBytes=new IndirectNIOBuffer(16);
} }
/* ------------------------------------------------------------ */
public Connection handle() throws IOException public Connection handle() throws IOException
{ {
try try
@ -146,7 +137,9 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
} }
} }
_websocket.onConnect(this); if (_websocket instanceof OnFrame)
((OnFrame)_websocket).onHandshake(this);
_websocket.onOpen(this);
return this; return this;
} }
@ -169,6 +162,7 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
} }
catch(IOException e) catch(IOException e)
{ {
Log.debug(e);
try try
{ {
_endp.close(); _endp.close();
@ -184,12 +178,19 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
if (_endp.isOpen()) if (_endp.isOpen())
{ {
_idle.access(_endp); _idle.access(_endp);
if (_endp.isInputShutdown() && _generator.isBufferEmpty())
_endp.close();
else
checkWriteable();
checkWriteable(); checkWriteable();
} }
} }
return this; return this;
} }
/* ------------------------------------------------------------ */
private void doTheHixieHixieShake() private void doTheHixieHixieShake()
{ {
byte[] result=WebSocketConnectionD00.doTheHixieHixieShake( byte[] result=WebSocketConnectionD00.doTheHixieHixieShake(
@ -199,25 +200,29 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
_hixieBytes.clear(); _hixieBytes.clear();
_hixieBytes.put(result); _hixieBytes.put(result);
} }
/* ------------------------------------------------------------ */
public boolean isOpen() public boolean isOpen()
{ {
return _endp!=null&&_endp.isOpen(); return _endp!=null&&_endp.isOpen();
} }
/* ------------------------------------------------------------ */
public boolean isIdle() public boolean isIdle()
{ {
return _parser.isBufferEmpty() && _generator.isBufferEmpty(); return _parser.isBufferEmpty() && _generator.isBufferEmpty();
} }
/* ------------------------------------------------------------ */
public boolean isSuspended() public boolean isSuspended()
{ {
return false; return false;
} }
/* ------------------------------------------------------------ */
public void closed() public void closed()
{ {
_websocket.onDisconnect(0,""); _websocket.onClose(WebSocketConnectionD06.CLOSE_NORMAL,"");
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -265,7 +270,7 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void disconnect(int code, String message) public void close(int code, String message)
{ {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }
@ -378,7 +383,9 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
response.addHeader("WebSocket-Protocol",subprotocol); response.addHeader("WebSocket-Protocol",subprotocol);
response.sendError(101,"Web Socket Protocol Handshake"); response.sendError(101,"Web Socket Protocol Handshake");
response.flushBuffer(); response.flushBuffer();
_websocket.onConnect(this); if (_websocket instanceof OnFrame)
((OnFrame)_websocket).onHandshake(this);
_websocket.onOpen(this);
} }
} }
@ -444,92 +451,57 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc
public void close(int code,String message) public void close(int code,String message)
{ {
disconnect(code,message); close(code,message);
} }
} }
class FrameHandlerD1 implements WebSocketParser.FrameHandler public boolean isMessageComplete(byte flags)
{ {
public final static byte PING=1; return true;
public final static byte PONG=1;
final WebSocket _websocket;
final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
boolean _fragmented=false;
FrameHandlerD1(WebSocket websocket)
{
_websocket=websocket;
}
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
try
{
byte[] array=buffer.array();
if (opcode==0)
{
if (isMore(flags))
{
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
_fragmented=true;
}
else if (_fragmented)
{
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
if (_websocket instanceof WebSocket.OnTextMessage)
((WebSocket.OnTextMessage)_websocket).onMessage(_utf8.toString());
_utf8.reset();
_fragmented=false;
}
else
{
if (_websocket instanceof WebSocket.OnTextMessage)
((WebSocket.OnTextMessage)_websocket).onMessage(buffer.toString(StringUtil.__UTF8));
}
}
else if (opcode==PING)
{
sendFrame(flags,PONG,buffer.array(),buffer.getIndex(),buffer.length());
}
else if (opcode==PONG)
{
}
else
{
if (isMore(flags))
{
if (_websocket instanceof WebSocket.OnFrame)
((WebSocket.OnFrame)_websocket).onFrame(flags,opcode,array,buffer.getIndex(),buffer.length());
}
else if (_fragmented)
{
if (_websocket instanceof WebSocket.OnFrame)
((WebSocket.OnFrame)_websocket).onFrame(flags,opcode,array,buffer.getIndex(),buffer.length());
}
else
{
if (_websocket instanceof WebSocket.OnBinaryMessage)
((WebSocket.OnBinaryMessage)_websocket).onMessage(array,buffer.getIndex(),buffer.length());
}
}
}
catch(ThreadDeath th)
{
throw th;
}
catch(Throwable th)
{
Log.warn(th);
}
}
public void close(int code,String message)
{
disconnect(code,message);
}
} }
public byte binaryOpcode()
{
return LENGTH_FRAME;
}
public byte textOpcode()
{
return SENTINEL_FRAME;
}
public boolean isControl(byte opcode)
{
return false;
}
public boolean isText(byte opcode)
{
return (opcode&LENGTH_FRAME)==0;
}
public boolean isBinary(byte opcode)
{
return (opcode&LENGTH_FRAME)!=0;
}
public boolean isContinuation(byte opcode)
{
return false;
}
public boolean isClose(byte opcode)
{
return false;
}
public boolean isPing(byte opcode)
{
return false;
}
public boolean isPong(byte opcode)
{
return false;
}
} }

View File

@ -29,7 +29,6 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint; import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.util.B64Code; import org.eclipse.jetty.util.B64Code;
import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.TypeUtil;
import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.websocket.WebSocket.OnFrame; import org.eclipse.jetty.websocket.WebSocket.OnFrame;
@ -39,25 +38,25 @@ import org.eclipse.jetty.websocket.WebSocket.OnControl;
public class WebSocketConnectionD06 extends AbstractConnection implements WebSocketConnection public class WebSocketConnectionD06 extends AbstractConnection implements WebSocketConnection
{ {
public final static byte OP_CONTINUATION = 0x00; final static byte OP_CONTINUATION = 0x00;
public final static byte OP_CLOSE = 0x01; final static byte OP_CLOSE = 0x01;
public final static byte OP_PING = 0x02; final static byte OP_PING = 0x02;
public final static byte OP_PONG = 0x03; final static byte OP_PONG = 0x03;
public final static byte OP_TEXT = 0x04; final static byte OP_TEXT = 0x04;
public final static byte OP_BINARY = 0x05; final static byte OP_BINARY = 0x05;
public final static int CLOSE_NORMAL=1000; final static int CLOSE_NORMAL=1000;
public final static int CLOSE_SHUTDOWN=1001; final static int CLOSE_SHUTDOWN=1001;
public final static int CLOSE_PROTOCOL=1002; final static int CLOSE_PROTOCOL=1002;
public final static int CLOSE_BADDATA=1003; final static int CLOSE_BADDATA=1003;
public final static int CLOSE_LARGE=1004; final static int CLOSE_LARGE=1004;
public static boolean isLastFrame(int flags) static boolean isLastFrame(int flags)
{ {
return (flags&0x8)!=0; return (flags&0x8)!=0;
} }
public static boolean isControlFrame(int opcode) static boolean isControlFrame(int opcode)
{ {
switch(opcode) switch(opcode)
{ {
@ -98,333 +97,16 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
} }
} }
private final WebSocketParser.FrameHandler _frameHandler= new WebSocketParser.FrameHandler() private final WebSocketParser.FrameHandler _frameHandler= new FrameHandlerD06();
{
private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
private ByteArrayBuffer _aggregate;
private byte _opcode=-1;
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
boolean more=(flags&0x8)==0;
synchronized(WebSocketConnectionD06.this)
{
// Ignore incoming after a close
if (_closedIn)
return;
try
{
byte[] array=buffer.array();
// Deliver frame if websocket is a FrameWebSocket
if (_onFrame!=null)
{
if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
return;
}
if (_onControl!=null && isControlFrame(opcode))
{
if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
return;
}
switch(opcode)
{
case WebSocketConnectionD06.OP_CONTINUATION:
{
// If text, append to the message buffer
if (_opcode==WebSocketConnectionD06.OP_TEXT && _connection.getMaxTextMessageSize()>=0)
{
if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
{
// If this is the last fragment, deliver the text buffer
if (more && _onTextMessage!=null)
{
_opcode=-1;
String msg =_utf8.toString();
_utf8.reset();
_onTextMessage.onMessage(msg);
}
}
else
{
_connection.disconnect(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
_utf8.reset();
_opcode=-1;
}
}
else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
{
if (_aggregate.space()<_aggregate.length())
{
_connection.disconnect(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
_aggregate.clear();
_opcode=-1;
}
else
{
_aggregate.put(buffer);
// If this is the last fragment, deliver
if (!more && _onBinaryMessage!=null)
{
try
{
_onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
}
finally
{
_opcode=-1;
_aggregate.clear();
}
}
}
}
break;
}
case WebSocketConnectionD06.OP_PING:
{
Log.debug("PING {}",this);
if (!_closedOut)
_connection.sendControl(WebSocketConnectionD06.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
break;
}
case WebSocketConnectionD06.OP_PONG:
{
Log.debug("PONG {}",this);
break;
}
case WebSocketConnectionD06.OP_CLOSE:
{
int code=-1;
String message=null;
if (buffer.length()>=2)
{
code=buffer.array()[buffer.getIndex()]*0xff+buffer.array()[buffer.getIndex()+1];
if (buffer.length()>2)
message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
}
closeIn(code,message);
break;
}
case WebSocketConnectionD06.OP_TEXT:
{
if(_onTextMessage!=null)
{
if (more)
{
if (_connection.getMaxTextMessageSize()>=0)
{
// If this is a text fragment, append to buffer
if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
_opcode=WebSocketConnectionD06.OP_TEXT;
else
{
_utf8.reset();
_opcode=-1;
_connection.disconnect(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
}
}
}
else
{
// Deliver the message
_onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
}
}
break;
}
default:
{
if (_onBinaryMessage!=null)
{
if (more)
{
if (_connection.getMaxBinaryMessageSize()>=0)
{
if (buffer.length()>_connection.getMaxBinaryMessageSize())
{
_connection.disconnect(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
if (_aggregate!=null)
_aggregate.clear();
_opcode=-1;
}
else
{
_opcode=opcode;
if (_aggregate==null)
_aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
_aggregate.put(buffer);
}
}
}
else
{
_onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
}
}
}
}
}
catch(ThreadDeath th)
{
throw th;
}
catch(Throwable th)
{
Log.warn(th);
}
}
}
public void close(int code,String message)
{
}
public String toString()
{
return WebSocketConnectionD06.this.toString()+"FH";
}
};
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private final WebSocket.Connection _connection = new WebSocket.Connection() private final WebSocket.FrameConnection _connection = new FrameConnectionD06();
{
volatile boolean _disconnecting;
int _maxTextMessage=WebSocketConnectionD06.this._maxTextMessageSize;
int _maxBinaryMessage=WebSocketConnectionD06.this._maxBinaryMessageSize;
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(byte, java.lang.String)
*/
public synchronized void sendMessage(String content) throws IOException
{
if (_closedOut)
throw new IOException("closing");
byte[] data = content.getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,0,data.length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(byte, byte[], int, int)
*/
public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException
{
if (_closedOut)
throw new IOException("closing");
_generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_BINARY,content,offset,length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.websocket.WebSocketConnection#sendFrame(boolean, byte, byte[], int, int)
*/
public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
{
if (_closedOut)
throw new IOException("closing");
_generator.addFrame(flags,opcode,content,offset,length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
public void sendControl(byte control, byte[] data, int offset, int length) throws IOException
{
if (_closedOut)
throw new IOException("closing");
_generator.addFrame((byte)0x8,control,data,offset,length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
public boolean isMore(byte flags)
{
return (flags&0x8)==0;
}
/* ------------------------------------------------------------ */
public boolean isOpen()
{
return _endp!=null&&_endp.isOpen();
}
/* ------------------------------------------------------------ */
public void disconnect(int code, String message)
{
if (_disconnecting)
return;
_disconnecting=true;
WebSocketConnectionD06.this.closeOut(code,message);
}
/* ------------------------------------------------------------ */
public void disconnect()
{
if (_disconnecting)
return;
_disconnecting=true;
WebSocketConnectionD06.this.closeOut(1000,null);
}
/* ------------------------------------------------------------ */
public void setMaxTextMessageSize(int size)
{
_maxTextMessage=size;
}
/* ------------------------------------------------------------ */
public void setMaxBinaryMessageSize(int size)
{
_maxBinaryMessage=size;
}
/* ------------------------------------------------------------ */
public int getMaxTextMessageSize()
{
return _maxTextMessage;
}
/* ------------------------------------------------------------ */
public int getMaxBinaryMessageSize()
{
return _maxBinaryMessage;
}
/* ------------------------------------------------------------ */
public String getProtocol()
{
return _protocol;
}
};
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public WebSocketConnectionD06(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, int draft) public WebSocketConnectionD06(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol)
throws IOException throws IOException
{ {
super(endpoint,timestamp); super(endpoint,timestamp);
@ -518,6 +200,8 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
_idle.access(_endp); _idle.access(_endp);
if (_closedIn && _closedOut && _generator.isBufferEmpty()) if (_closedIn && _closedOut && _generator.isBufferEmpty())
_endp.close(); _endp.close();
else if (_endp.isInputShutdown() && !_closedIn)
closeIn(CLOSE_PROTOCOL,null);
else else
checkWriteable(); checkWriteable();
} }
@ -548,7 +232,7 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
public void closed() public void closed()
{ {
_webSocket.onDisconnect(WebSocketConnectionD06.CLOSE_NORMAL,""); _webSocket.onClose(WebSocketConnectionD06.CLOSE_NORMAL,"");
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -612,7 +296,392 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
private void checkWriteable() private void checkWriteable()
{ {
if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint) if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint)
{
((AsyncEndPoint)_endp).scheduleWrite(); ((AsyncEndPoint)_endp).scheduleWrite();
}
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private class FrameConnectionD06 implements WebSocket.FrameConnection
{
volatile boolean _disconnecting;
int _maxTextMessage=WebSocketConnectionD06.this._maxTextMessageSize;
int _maxBinaryMessage=WebSocketConnectionD06.this._maxBinaryMessageSize;
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(byte, java.lang.String)
*/
public synchronized void sendMessage(String content) throws IOException
{
if (_closedOut)
throw new IOException("closing");
byte[] data = content.getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,0,data.length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(byte, byte[], int, int)
*/
public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException
{
if (_closedOut)
throw new IOException("closing");
_generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_BINARY,content,offset,length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.websocket.WebSocketConnection#sendFrame(boolean, byte, byte[], int, int)
*/
public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException
{
if (_closedOut)
throw new IOException("closing");
_generator.addFrame(flags,opcode,content,offset,length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
public void sendControl(byte control, byte[] data, int offset, int length) throws IOException
{
if (_closedOut)
throw new IOException("closing");
_generator.addFrame((byte)0x8,control,data,offset,length,_endp.getMaxIdleTime());
_generator.flush();
checkWriteable();
_idle.access(_endp);
}
/* ------------------------------------------------------------ */
public boolean isMessageComplete(byte flags)
{
return isLastFrame(flags);
}
/* ------------------------------------------------------------ */
public boolean isOpen()
{
return _endp!=null&&_endp.isOpen();
}
/* ------------------------------------------------------------ */
public void close(int code, String message)
{
if (_disconnecting)
return;
_disconnecting=true;
WebSocketConnectionD06.this.closeOut(code,message);
}
/* ------------------------------------------------------------ */
public void setMaxTextMessageSize(int size)
{
_maxTextMessage=size;
}
/* ------------------------------------------------------------ */
public void setMaxBinaryMessageSize(int size)
{
_maxBinaryMessage=size;
}
/* ------------------------------------------------------------ */
public int getMaxTextMessageSize()
{
return _maxTextMessage;
}
/* ------------------------------------------------------------ */
public int getMaxBinaryMessageSize()
{
return _maxBinaryMessage;
}
/* ------------------------------------------------------------ */
public String getProtocol()
{
return _protocol;
}
/* ------------------------------------------------------------ */
public byte binaryOpcode()
{
return OP_BINARY;
}
/* ------------------------------------------------------------ */
public byte textOpcode()
{
return OP_TEXT;
}
/* ------------------------------------------------------------ */
public boolean isControl(byte opcode)
{
return isControlFrame(opcode);
}
/* ------------------------------------------------------------ */
public boolean isText(byte opcode)
{
return opcode==OP_TEXT;
}
/* ------------------------------------------------------------ */
public boolean isBinary(byte opcode)
{
return opcode==OP_BINARY;
}
/* ------------------------------------------------------------ */
public boolean isContinuation(byte opcode)
{
return opcode==OP_CONTINUATION;
}
/* ------------------------------------------------------------ */
public boolean isClose(byte opcode)
{
return opcode==OP_CLOSE;
}
/* ------------------------------------------------------------ */
public boolean isPing(byte opcode)
{
return opcode==OP_PING;
}
/* ------------------------------------------------------------ */
public boolean isPong(byte opcode)
{
return opcode==OP_PONG;
}
/* ------------------------------------------------------------ */
public void disconnect()
{
close(CLOSE_NORMAL,null);
}
/* ------------------------------------------------------------ */
public String toString()
{
return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort();
}
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private class FrameHandlerD06 implements WebSocketParser.FrameHandler
{
private final Utf8StringBuilder _utf8 = new Utf8StringBuilder();
private ByteArrayBuffer _aggregate;
private byte _opcode=-1;
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
boolean more=(flags&0x8)==0;
synchronized(WebSocketConnectionD06.this)
{
// Ignore incoming after a close
if (_closedIn)
return;
try
{
byte[] array=buffer.array();
// Deliver frame if websocket is a FrameWebSocket
if (_onFrame!=null)
{
if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()))
return;
}
if (_onControl!=null && isControlFrame(opcode))
{
if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length()))
return;
}
switch(opcode)
{
case WebSocketConnectionD06.OP_CONTINUATION:
{
// If text, append to the message buffer
if (_opcode==WebSocketConnectionD06.OP_TEXT && _connection.getMaxTextMessageSize()>=0)
{
if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
{
// If this is the last fragment, deliver the text buffer
if (more && _onTextMessage!=null)
{
_opcode=-1;
String msg =_utf8.toString();
_utf8.reset();
_onTextMessage.onMessage(msg);
}
}
else
{
_connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
_utf8.reset();
_opcode=-1;
}
}
else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0)
{
if (_aggregate.space()<_aggregate.length())
{
_connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
_aggregate.clear();
_opcode=-1;
}
else
{
_aggregate.put(buffer);
// If this is the last fragment, deliver
if (!more && _onBinaryMessage!=null)
{
try
{
_onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length());
}
finally
{
_opcode=-1;
_aggregate.clear();
}
}
}
}
break;
}
case WebSocketConnectionD06.OP_PING:
{
Log.debug("PING {}",this);
if (!_closedOut)
_connection.sendControl(WebSocketConnectionD06.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length());
break;
}
case WebSocketConnectionD06.OP_PONG:
{
Log.debug("PONG {}",this);
break;
}
case WebSocketConnectionD06.OP_CLOSE:
{
int code=-1;
String message=null;
if (buffer.length()>=2)
{
code=buffer.array()[buffer.getIndex()]*0xff+buffer.array()[buffer.getIndex()+1];
if (buffer.length()>2)
message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8);
}
closeIn(code,message);
break;
}
case WebSocketConnectionD06.OP_TEXT:
{
if(_onTextMessage!=null)
{
if (more)
{
if (_connection.getMaxTextMessageSize()>=0)
{
// If this is a text fragment, append to buffer
if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize()))
_opcode=WebSocketConnectionD06.OP_TEXT;
else
{
_utf8.reset();
_opcode=-1;
_connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars");
}
}
}
else
{
// Deliver the message
_onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8));
}
}
break;
}
default:
{
if (_onBinaryMessage!=null)
{
if (more)
{
if (_connection.getMaxBinaryMessageSize()>=0)
{
if (buffer.length()>_connection.getMaxBinaryMessageSize())
{
_connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize());
if (_aggregate!=null)
_aggregate.clear();
_opcode=-1;
}
else
{
_opcode=opcode;
if (_aggregate==null)
_aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize());
_aggregate.put(buffer);
}
}
}
else
{
_onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length());
}
}
}
}
}
catch(ThreadDeath th)
{
throw th;
}
catch(Throwable th)
{
Log.warn(th);
}
}
}
public void close(int code,String message)
{
_connection.close(code,message);
}
public String toString()
{
return WebSocketConnectionD06.this.toString()+"FH";
}
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
@ -637,7 +706,9 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc
response.addHeader("Sec-WebSocket-Protocol",subprotocol); response.addHeader("Sec-WebSocket-Protocol",subprotocol);
response.sendError(101); response.sendError(101);
_webSocket.onConnect(_connection); if (_onFrame!=null)
_onFrame.onHandshake(_connection);
_webSocket.onOpen(_connection);
} }
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */

View File

@ -21,6 +21,7 @@ import org.eclipse.jetty.http.HttpException;
import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.io.ConnectedEndPoint; import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.util.log.Log;
/** /**
* Factory to create WebSocket connections * Factory to create WebSocket connections
@ -120,16 +121,16 @@ public class WebSocketFactory
final WebSocketConnection connection; final WebSocketConnection connection;
switch (draft) switch (draft)
{ {
case 6: case -1:
connection = new WebSocketConnectionD06(websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol, draft); case 0:
connection = new WebSocketConnectionD00(websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol);
break;
case 6:
connection = new WebSocketConnectionD06(websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol);
break; break;
case 5:
case 4:
case 3:
case 2:
throw new HttpException(400, "Unsupported draft specification: " + draft);
default: default:
connection = new WebSocketConnectionD00(websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol, draft); Log.warn("Unsupported Websocket version: "+draft);
throw new HttpException(400, "Unsupported draft specification: " + draft);
} }
// Let the connection finish processing the handshake // Let the connection finish processing the handshake

View File

@ -131,7 +131,7 @@ public class WebSocketGeneratorD00 implements WebSocketGenerator
if (!_endp.isOpen()) if (!_endp.isOpen())
throw new EofException(); throw new EofException();
if (_buffer!=null) if (_buffer!=null && _buffer.hasContent())
return _endp.flush(_buffer); return _endp.flush(_buffer);
return 0; return 0;

View File

@ -1,181 +0,0 @@
// ========================================================================
// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.websocket;
import java.io.IOException;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
/* ------------------------------------------------------------ */
/** WebSocketGenerator.
* This class generates websocket packets.
* It is fully synchronized because it is likely that async
* threads will call the addMessage methods while other
* threads are flushing the generator.
*/
public class WebSocketGeneratorD01 implements WebSocketGenerator
{
final private WebSocketBuffers _buffers;
final private EndPoint _endp;
private Buffer _buffer;
public WebSocketGeneratorD01(WebSocketBuffers buffers, EndPoint endp)
{
_buffers=buffers;
_endp=endp;
}
public synchronized void addFrame(byte flags,byte opcode, byte[] content, int offset, int length, int blockFor) throws IOException
{
if (_buffer==null)
_buffer=_buffers.getDirectBuffer();
if (_buffer.space() == 0)
expelBuffer(blockFor);
opcode = (byte)(opcode & 0x0f);
while (length>0)
{
// slice a fragment off
int fragment=length;
if (fragment+10>_buffer.capacity())
{
fragment=_buffer.capacity()-10;
bufferPut((byte)(0x80|opcode), blockFor);
}
else if ((flags&0x8)==0)
bufferPut(opcode, blockFor);
else
bufferPut((byte)(0x80|opcode), blockFor);
if (fragment>0xffff)
{
bufferPut((byte)0x7f, blockFor);
bufferPut((byte)((fragment>>56)&0x7f), blockFor);
bufferPut((byte)((fragment>>48)&0xff), blockFor);
bufferPut((byte)((fragment>>40)&0xff), blockFor);
bufferPut((byte)((fragment>>32)&0xff), blockFor);
bufferPut((byte)((fragment>>24)&0xff), blockFor);
bufferPut((byte)((fragment>>16)&0xff), blockFor);
bufferPut((byte)((fragment>>8)&0xff), blockFor);
bufferPut((byte)(fragment&0xff), blockFor);
}
else if (fragment >=0x7e)
{
bufferPut((byte)126, blockFor);
bufferPut((byte)(fragment>>8), blockFor);
bufferPut((byte)(fragment&0xff), blockFor);
}
else
{
bufferPut((byte)fragment, blockFor);
}
int remaining = fragment;
while (remaining > 0)
{
_buffer.compact();
int chunk = remaining < _buffer.space() ? remaining : _buffer.space();
_buffer.put(content, offset + (fragment - remaining), chunk);
remaining -= chunk;
if (_buffer.space() > 0)
{
// Gently flush the data, issuing a non-blocking write
flushBuffer();
}
else
{
// Forcibly flush the data, issuing a blocking write
expelBuffer(blockFor);
if (remaining == 0)
{
// Gently flush the data, issuing a non-blocking write
flushBuffer();
}
}
}
offset+=fragment;
length-=fragment;
}
}
private synchronized void bufferPut(byte datum, long blockFor) throws IOException
{
if (_buffer==null)
_buffer=_buffers.getDirectBuffer();
_buffer.put(datum);
if (_buffer.space() == 0)
expelBuffer(blockFor);
}
public synchronized int flush(int blockFor) throws IOException
{
return expelBuffer(blockFor);
}
public synchronized int flush() throws IOException
{
int flushed = flushBuffer();
if (_buffer!=null && _buffer.length()==0)
{
_buffers.returnBuffer(_buffer);
_buffer=null;
}
return flushed;
}
private synchronized int flushBuffer() throws IOException
{
if (!_endp.isOpen())
throw new EofException();
if (_buffer!=null)
return _endp.flush(_buffer);
return 0;
}
private synchronized int expelBuffer(long blockFor) throws IOException
{
if (_buffer==null)
return 0;
int result = flushBuffer();
_buffer.compact();
if (!_endp.isBlocking())
{
while (_buffer.space()==0)
{
// TODO: in case the I/O system signals write ready, but when we attempt to write we cannot
// TODO: we should decrease the blockFor timeout instead of waiting again the whole timeout
boolean ready = _endp.blockWritable(blockFor);
if (!ready)
throw new IOException("Write timeout");
result += flushBuffer();
_buffer.compact();
}
}
return result;
}
public synchronized boolean isBufferEmpty()
{
return _buffer==null || _buffer.length()==0;
}
}

View File

@ -1,224 +0,0 @@
// ========================================================================
// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
package org.eclipse.jetty.websocket;
import java.io.IOException;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.eclipse.jetty.util.log.Log;
/* ------------------------------------------------------------ */
/**
* Parser the WebSocket protocol.
*
*/
public class WebSocketParserD01 implements WebSocketParser
{
public enum State {
START(1), LENGTH_7(2), LENGTH_16(4), LENGTH_63(10), DATA(10);
int _minSize;
State(int minSize)
{
_minSize=minSize;
}
int getMinSize()
{
return _minSize;
}
};
private final WebSocketBuffers _buffers;
private final EndPoint _endp;
private final FrameHandler _handler;
private State _state=State.START;
private Buffer _buffer;
private byte _flags;
private byte _opcode;
private int _count;
private long _length;
private Utf8StringBuilder _utf8;
/* ------------------------------------------------------------ */
/**
* @param buffers The buffers to use for parsing. Only the {@link Buffers#getBuffer()} is used.
* This should be a direct buffer if binary data is mostly used or an indirect buffer if utf-8 data
* is mostly used.
* @param endp
* @param handler
*/
public WebSocketParserD01(WebSocketBuffers buffers, EndPoint endp, FrameHandler handler)
{
_buffers=buffers;
_endp=endp;
_handler=handler;
}
/* ------------------------------------------------------------ */
public boolean isBufferEmpty()
{
return _buffer==null || _buffer.length()==0;
}
/* ------------------------------------------------------------ */
public Buffer getBuffer()
{
return _buffer;
}
/* ------------------------------------------------------------ */
/** Parse to next event.
* Parse to the next {@link WebSocketParser.FrameHandler} event or until no more data is
* available. Fill data from the {@link EndPoint} only as necessary.
* @return An indication of progress or otherwise. -1 indicates EOF, 0 indicates
* that no bytes were read and no messages parsed. A positive number indicates either
* the bytes filled or the messages parsed.
*/
public int parseNext()
{
if (_buffer==null)
_buffer=_buffers.getBuffer();
int total_filled=0;
// Loop until an datagram call back or can't fill anymore
while(true)
{
int available=_buffer.length();
// Fill buffer if we need a byte or need length
if (available < _state.getMinSize() || _state==State.DATA && available<_count)
{
// compact to mark (set at start of data)
_buffer.compact();
// if no space, then the data is too big for buffer
if (_buffer.space() == 0)
throw new IllegalStateException("FULL");
// catch IOExceptions (probably EOF) and try to parse what we have
try
{
int filled=_endp.isOpen()?_endp.fill(_buffer):-1;
if (filled<=0)
return total_filled>0?total_filled:-1;
total_filled+=filled;
available=_buffer.length();
}
catch(IOException e)
{
Log.debug(e);
return total_filled>0?total_filled:-1;
}
}
// Parse the buffer byte by byte (unless it is STATE_DATA)
byte b;
while (_state!=State.DATA && available-->0)
{
switch (_state)
{
case START:
b=_buffer.get();
_opcode=(byte)(b&0xf);
_flags=(byte)(b>>4);
_state=State.LENGTH_7;
continue;
case LENGTH_7:
b=_buffer.get();
switch(b)
{
case 127:
_length=0;
_count=8;
_state=State.LENGTH_63;
break;
case 126:
_length=0;
_count=2;
_state=State.LENGTH_16;
break;
default:
_length=(0x7f&b);
_count=(int)_length;
_state=State.DATA;
}
continue;
case LENGTH_16:
b=_buffer.get();
_length = _length<<8 | b;
if (--_count==0)
{
if (_length>=_buffer.capacity()-4)
throw new IllegalStateException("TOO LARGE");
_count=(int)_length;
_state=State.DATA;
}
continue;
case LENGTH_63:
b=_buffer.get();
_length = _length<<8 | b;
if (--_count==0)
{
if (_length>=_buffer.capacity()-10)
throw new IllegalStateException("TOO LARGE");
_count=(int)_length;
_state=State.DATA;
}
continue;
}
}
if (_state==State.DATA && available>=_count)
{
_handler.onFrame(_flags, _opcode, _buffer.get(_count));
_count=0;
_state=State.START;
if (_buffer.length()==0)
{
_buffers.returnBuffer(_buffer);
_buffer=null;
}
return total_filled;
}
}
}
/* ------------------------------------------------------------ */
public void fill(Buffer buffer)
{
if (buffer!=null && buffer.length()>0)
{
if (_buffer==null)
_buffer=_buffers.getBuffer();
_buffer.put(buffer);
buffer.clear();
}
}
}

View File

@ -21,7 +21,7 @@ public class WebSocketGeneratorD00Test
{ {
WebSocketBuffers buffers = new WebSocketBuffers(1024); WebSocketBuffers buffers = new WebSocketBuffers(1024);
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
_generator = new WebSocketGeneratorD01(buffers, endPoint); _generator = new WebSocketGeneratorD00(buffers, endPoint);
_out = new ByteArrayBuffer(4096); _out = new ByteArrayBuffer(4096);
endPoint.setOut(_out); endPoint.setOut(_out);
} }
@ -32,7 +32,32 @@ public class WebSocketGeneratorD00Test
byte[] data="Hell\uFF4F W\uFF4Frld".getBytes(StringUtil.__UTF8); byte[] data="Hell\uFF4F W\uFF4Frld".getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0x0,(byte)0x04,data,0,data.length,0); _generator.addFrame((byte)0x0,(byte)0x04,data,0,data.length,0);
_generator.flush(); _generator.flush();
assertEquals(4,_out.get()); assertEquals((byte)0x04,_out.get());
assertEquals('H',_out.get());
assertEquals('e',_out.get());
assertEquals('l',_out.get());
assertEquals('l',_out.get());
assertEquals(0xEF,0xff&_out.get());
assertEquals(0xBD,0xff&_out.get());
assertEquals(0x8F,0xff&_out.get());
assertEquals(' ',_out.get());
assertEquals('W',_out.get());
assertEquals(0xEF,0xff&_out.get());
assertEquals(0xBD,0xff&_out.get());
assertEquals(0x8F,0xff&_out.get());
assertEquals('r',_out.get());
assertEquals('l',_out.get());
assertEquals('d',_out.get());
assertEquals((byte)0xff,_out.get());
}
@Test
public void testOneBinaryString() throws Exception
{
byte[] data="Hell\uFF4F W\uFF4Frld".getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0x0,(byte)0x84,data,0,data.length,0);
_generator.flush();
assertEquals((byte)0x84,_out.get());
assertEquals(15,_out.get()); assertEquals(15,_out.get());
assertEquals('H',_out.get()); assertEquals('H',_out.get());
assertEquals('e',_out.get()); assertEquals('e',_out.get());
@ -51,64 +76,4 @@ public class WebSocketGeneratorD00Test
assertEquals('d',_out.get()); assertEquals('d',_out.get());
} }
@Test
public void testOneMediumBuffer() throws Exception
{
byte[] b=new byte[501];
for (int i=0;i<b.length;i++)
b[i]=(byte)('0'+(i%10));
_generator.addFrame((byte)0x0,(byte)0xf,b,0,b.length,0);
_generator.flush();
assertEquals(0x0f,_out.get());
assertEquals(0x7e,_out.get());
assertEquals((b.length>>8),0xff&_out.get());
assertEquals(0xff&b.length,0xff&_out.get());
for (int i=0;i<b.length;i++)
assertEquals('0'+(i%10),0xff&_out.get());
}
@Test
public void testFragmentBuffer() throws Exception
{
byte[] b=new byte[3001];
for (int i=0;i<b.length;i++)
b[i]=(byte)('0'+(i%10));
_generator.addFrame((byte)0x0,(byte)0xf,b,0,b.length,0);
_generator.flush();
assertEquals(0x8f,_out.get()&0xff);
assertEquals(0x7e,_out.get()&0xff);
int m=0;
int frag=((_out.get()&0xff)<<8)|_out.get()&0xff;
for (int i=0;i<frag;i++)
{
assertEquals("b="+i,'0'+(m%10),0xff&_out.get());
m++;
}
assertEquals(0x8f,_out.get()&0xff);
assertEquals(0x7e,_out.get()&0xff);
frag=((_out.get()&0xff)<<8)|_out.get()&0xff;
for (int i=0;i<frag;i++)
{
assertEquals("b="+i,'0'+(m%10),0xff&_out.get());
m++;
}
assertEquals(0x0f,_out.get()&0xff);
assertEquals(0x7e,_out.get()&0xff);
frag=((_out.get()&0xff)<<8)|_out.get()&0xff;
for (int i=0;i<frag;i++)
{
assertEquals("b="+i,'0'+(m%10),0xff&_out.get());
m++;
}
assertEquals(b.length,m);
}
} }

View File

@ -1,97 +0,0 @@
package org.eclipse.jetty.websocket;
import static junit.framework.Assert.assertEquals;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.util.StringUtil;
import org.junit.Before;
import org.junit.Test;
/**
* @version $Revision$ $Date$
*/
public class WebSocketGeneratorD01Test
{
private ByteArrayBuffer _out;
private WebSocketGenerator _generator;
@Before
public void setUp() throws Exception
{
WebSocketBuffers buffers = new WebSocketBuffers(1024);
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
_generator = new WebSocketGeneratorD00(buffers, endPoint);
_out = new ByteArrayBuffer(2048);
endPoint.setOut(_out);
}
@Test
public void testOneString() throws Exception
{
byte[] data = "Hell\uFF4F W\uFF4Frld".getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0,(byte)0x04,data,0,data.length,0);
_generator.flush();
assertEquals(4,_out.get());
assertEquals('H',_out.get());
assertEquals('e',_out.get());
assertEquals('l',_out.get());
assertEquals('l',_out.get());
assertEquals(0xEF,0xff&_out.get());
assertEquals(0xBD,0xff&_out.get());
assertEquals(0x8F,0xff&_out.get());
assertEquals(' ',_out.get());
assertEquals('W',_out.get());
assertEquals(0xEF,0xff&_out.get());
assertEquals(0xBD,0xff&_out.get());
assertEquals(0x8F,0xff&_out.get());
assertEquals('r',_out.get());
assertEquals('l',_out.get());
assertEquals('d',_out.get());
assertEquals(0xff,0xff&_out.get());
}
@Test
public void testOneBuffer() throws Exception
{
String string = "Hell\uFF4F W\uFF4Frld";
byte[] bytes=string.getBytes(StringUtil.__UTF8);
_generator.addFrame((byte)0,(byte)0x84,bytes,0,bytes.length,0);
_generator.flush();
assertEquals(0x84,0xff&_out.get());
assertEquals(15,0xff&_out.get());
assertEquals('H',_out.get());
assertEquals('e',_out.get());
assertEquals('l',_out.get());
assertEquals('l',_out.get());
assertEquals(0xEF,0xff&_out.get());
assertEquals(0xBD,0xff&_out.get());
assertEquals(0x8F,0xff&_out.get());
assertEquals(' ',_out.get());
assertEquals('W',_out.get());
assertEquals(0xEF,0xff&_out.get());
assertEquals(0xBD,0xff&_out.get());
assertEquals(0x8F,0xff&_out.get());
assertEquals('r',_out.get());
assertEquals('l',_out.get());
assertEquals('d',_out.get());
}
@Test
public void testOneLongBuffer() throws Exception
{
byte[] b=new byte[150];
for (int i=0;i<b.length;i++)
b[i]=(byte)('0'+(i%10));
_generator.addFrame((byte)0,(byte)0x85,b,0,b.length,0);
_generator.flush();
assertEquals(0x85,0xff&_out.get());
assertEquals(0x80|(b.length>>7),0xff&_out.get());
assertEquals(0x7f&b.length,0xff&_out.get());
for (int i=0;i<b.length;i++)
assertEquals('0'+(i%10),0xff&_out.get());
}
}

View File

@ -111,7 +111,7 @@ public class WebSocketLoadTest
{ {
private volatile Connection outbound; private volatile Connection outbound;
public void onConnect(Connection outbound) public void onOpen(Connection outbound)
{ {
this.outbound = outbound; this.outbound = outbound;
} }
@ -125,11 +125,11 @@ public class WebSocketLoadTest
} }
catch (IOException x) catch (IOException x)
{ {
outbound.disconnect(0,""); outbound.disconnect();
} }
} }
public void onDisconnect(int closeCode, String message) public void onClose(int closeCode, String message)
{ {
} }
} }

View File

@ -228,7 +228,7 @@ public class WebSocketMessageD00Test
private final CountDownLatch latch = new CountDownLatch(1); private final CountDownLatch latch = new CountDownLatch(1);
private volatile Connection outbound; private volatile Connection outbound;
public void onConnect(Connection outbound) public void onOpen(Connection outbound)
{ {
this.outbound = outbound; this.outbound = outbound;
if (onConnect) if (onConnect)
@ -250,7 +250,7 @@ public class WebSocketMessageD00Test
return latch.await(time, TimeUnit.MILLISECONDS); return latch.await(time, TimeUnit.MILLISECONDS);
} }
public void onDisconnect(int code,String message) public void onClose(int code,String message)
{ {
} }
} }

View File

@ -1,321 +0,0 @@
package org.eclipse.jetty.websocket;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.EOFException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* @version $Revision$ $Date$
*/
public class WebSocketMessageD01Test
{
private static Server _server;
private static Connector _connector;
private static TestWebSocket _serverWebSocket;
@BeforeClass
public static void startServer() throws Exception
{
_server = new Server();
_connector = new SelectChannelConnector();
_server.addConnector(_connector);
WebSocketHandler wsHandler = new WebSocketHandler()
{
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
_serverWebSocket = new TestWebSocket();
_serverWebSocket.onConnect=("onConnect".equals(protocol));
return _serverWebSocket;
}
};
wsHandler.setBufferSize(8192);
wsHandler.setMaxIdleTime(1000);
wsHandler.setHandler(new DefaultHandler());
_server.setHandler(wsHandler);
_server.start();
}
@AfterClass
public static void stopServer() throws Exception
{
_server.stop();
_server.join();
}
@Test
public void testServerSendBigStringMessage() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /test HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Upgrade: WebSocket\r\n" +
"Connection: Upgrade\r\n" +
"Sec-WebSocket-Draft: 1\r\n" +
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5\r\n" +
"Sec-WebSocket-Key2: 12998 5 Y3 1 .P00\r\n" +
"\r\n"+
"^n:ds[4U").getBytes("ISO-8859-1"));
output.flush();
// Make sure the read times out if there are problems with the implementation
socket.setSoTimeout(1000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 WebSocket Protocol Handshake\r\n",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.outbound);
lookFor("8jKS'y:G*Co,Wxa-",input);
// Server sends a big message
StringBuilder message = new StringBuilder();
String text = "0123456789ABCDEF";
for (int i = 0; i < (0x2000) / text.length(); i++)
message.append(text);
String data=message.toString();
_serverWebSocket.outbound.sendMessage(data);
assertEquals(0x80,input.read());
assertEquals(0x7e,input.read());
assertEquals(0x1f,input.read());
assertEquals(0xf6,input.read());
lookFor(data.substring(0,0x1ff6),input);
assertEquals(0x00,input.read());
assertEquals(0x0A,input.read());
lookFor(data.substring(0x1ff6),input);
}
@Test
public void testServerSendOnConnect() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /test HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Upgrade: WebSocket\r\n" +
"Connection: Upgrade\r\n" +
"Sec-WebSocket-Draft: 1\r\n" +
"Sec-WebSocket-Protocol: onConnect\r\n" +
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5\r\n" +
"Sec-WebSocket-Key2: 12998 5 Y3 1 .P00\r\n" +
"\r\n"+
"^n:ds[4U").getBytes("ISO-8859-1"));
output.flush();
// Make sure the read times out if there are problems with the implementation
socket.setSoTimeout(1000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 WebSocket Protocol Handshake\r\n",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.outbound);
lookFor("8jKS'y:G*Co,Wxa-",input);
assertEquals(0x00,input.read());
assertEquals(0x0f,input.read());
lookFor("sent on connect",input);
}
@Test
public void testIdle() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /test HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Upgrade: WebSocket\r\n" +
"Connection: Upgrade\r\n" +
"Sec-WebSocket-Draft: 1\r\n" +
"Sec-WebSocket-Protocol: onConnect\r\n" +
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5\r\n" +
"Sec-WebSocket-Key2: 12998 5 Y3 1 .P00\r\n" +
"\r\n"+
"^n:ds[4U").getBytes("ISO-8859-1"));
output.flush();
// Make sure the read times out if there are problems with the implementation
socket.setSoTimeout(1000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 WebSocket Protocol Handshake\r\n",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.outbound);
lookFor("8jKS'y:G*Co,Wxa-",input);
assertEquals(0x00,input.read());
assertEquals(0x0f,input.read());
lookFor("sent on connect",input);
assertTrue(_serverWebSocket.awaitDisconnected(5000));
try
{
_serverWebSocket.outbound.sendMessage("Don't send");
assertTrue(false);
}
catch(IOException e)
{
assertTrue(true);
}
}
@Test
public void testClose() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /test HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Upgrade: WebSocket\r\n" +
"Connection: Upgrade\r\n" +
"Sec-WebSocket-Draft: 1\r\n" +
"Sec-WebSocket-Protocol: onConnect\r\n" +
"Sec-WebSocket-Key1: 4 @1 46546xW%0l 1 5\r\n" +
"Sec-WebSocket-Key2: 12998 5 Y3 1 .P00\r\n" +
"\r\n"+
"^n:ds[4U").getBytes("ISO-8859-1"));
output.flush();
// Make sure the read times out if there are problems with the implementation
socket.setSoTimeout(1000);
InputStream input = socket.getInputStream();
lookFor("HTTP/1.1 101 WebSocket Protocol Handshake\r\n",input);
skipTo("\r\n\r\n",input);
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.outbound);
lookFor("8jKS'y:G*Co,Wxa-",input);
assertEquals(0x00,input.read());
assertEquals(0x0f,input.read());
lookFor("sent on connect",input);
socket.close();
assertTrue(_serverWebSocket.awaitDisconnected(500));
try
{
_serverWebSocket.outbound.sendMessage("Don't send");
assertTrue(false);
}
catch(IOException e)
{
assertTrue(true);
}
}
private void lookFor(String string,InputStream in)
throws IOException
{
while(true)
{
int b = in.read();
if (b<0)
throw new EOFException();
assertEquals((int)string.charAt(0),b);
if (string.length()==1)
break;
string=string.substring(1);
}
}
private void skipTo(String string,InputStream in)
throws IOException
{
int state=0;
while(true)
{
int b = in.read();
if (b<0)
throw new EOFException();
if (b==string.charAt(state))
{
state++;
if (state==string.length())
break;
}
else
state=0;
}
}
private static class TestWebSocket implements WebSocket
{
boolean onConnect=false;
private final CountDownLatch connected = new CountDownLatch(1);
private final CountDownLatch disconnected = new CountDownLatch(1);
private volatile Connection outbound;
public void onConnect(Connection outbound)
{
this.outbound = outbound;
if (onConnect)
{
try
{
outbound.sendMessage("sent on connect");
}
catch(IOException e)
{
e.printStackTrace();
}
}
connected.countDown();
}
private boolean awaitConnected(long time) throws InterruptedException
{
return connected.await(time, TimeUnit.MILLISECONDS);
}
private boolean awaitDisconnected(long time) throws InterruptedException
{
return disconnected.await(time, TimeUnit.MILLISECONDS);
}
public void onDisconnect(int code,String message)
{
disconnected.countDown();
}
}
}

View File

@ -745,16 +745,20 @@ public class WebSocketMessageD06Test
boolean aggregate=false; boolean aggregate=false;
private final CountDownLatch connected = new CountDownLatch(1); private final CountDownLatch connected = new CountDownLatch(1);
private final CountDownLatch disconnected = new CountDownLatch(1); private final CountDownLatch disconnected = new CountDownLatch(1);
private volatile Connection connection; private volatile FrameConnection connection;
public Connection getConnection() public Connection getConnection()
{ {
return connection; return connection;
} }
public void onConnect(Connection connection) public void onHandshake(FrameConnection connection)
{ {
this.connection = connection; this.connection = connection;
}
public void onOpen(Connection connection)
{
if (onConnect) if (onConnect)
{ {
try try
@ -779,7 +783,7 @@ public class WebSocketMessageD06Test
return disconnected.await(time, TimeUnit.MILLISECONDS); return disconnected.await(time, TimeUnit.MILLISECONDS);
} }
public void onDisconnect(int code,String message) public void onClose(int code,String message)
{ {
disconnected.countDown(); disconnected.countDown();
} }

View File

@ -1,190 +0,0 @@
package org.eclipse.jetty.websocket;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.net.Socket;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import javax.servlet.http.HttpServletRequest;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.server.nio.SelectChannelConnector;
import org.eclipse.jetty.util.StringUtil;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.Test;
/**
* @version $Revision$ $Date$
*/
public class WebSocketMessageW75Test
{
private static Server _server;
private static Connector _connector;
private static TestWebSocket _serverWebSocket;
@BeforeClass
public static void startServer() throws Exception
{
_server = new Server();
_connector = new SelectChannelConnector();
_server.addConnector(_connector);
WebSocketHandler wsHandler = new WebSocketHandler()
{
public WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
return _serverWebSocket = new TestWebSocket();
}
};
wsHandler.setHandler(new DefaultHandler());
_server.setHandler(wsHandler);
_server.start();
}
@AfterClass
public static void stopServer() throws Exception
{
_server.stop();
_server.join();
}
@Test
public void testServerSendBigStringMessage() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /test HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Upgrade: WebSocket\r\n" +
"Connection: Upgrade\r\n" +
"\r\n").getBytes("ISO-8859-1"));
output.flush();
// Make sure the read times out if there are problems with the implementation
socket.setSoTimeout(1000);
InputStream input = socket.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "ISO-8859-1"));
String responseLine = reader.readLine();
assertTrue(responseLine.startsWith("HTTP/1.1 101 Web Socket Protocol Handshake"));
// Read until we find an empty line, which signals the end of the http response
String line;
while ((line = reader.readLine()) != null)
if (line.length() == 0)
break;
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.outbound);
// Server sends a big message
StringBuilder message = new StringBuilder();
String text = "0123456789ABCDEF";
for (int i = 0; i < 64 * 1024 / text.length(); ++i)
message.append(text);
_serverWebSocket.outbound.sendMessage(message.toString());
// Read until we get 0xFF
ByteArrayOutputStream baos = new ByteArrayOutputStream();
while (true)
{
int read = input.read();
baos.write(read);
if (read == 0xFF)
break;
}
baos.close();
byte[] bytes = baos.toByteArray();
String result = StringUtil.printable(bytes);
assertTrue(result.startsWith("0x00"));
assertTrue(result.endsWith("0xFF"));
assertEquals(message.length() + "0x00".length() + "0xFF".length(), result.length());
}
@Test
public void testServerSendBigBinaryMessage() throws Exception
{
Socket socket = new Socket("localhost", _connector.getLocalPort());
OutputStream output = socket.getOutputStream();
output.write(
("GET /test HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Upgrade: WebSocket\r\n" +
"Connection: Upgrade\r\n" +
"\r\n").getBytes("ISO-8859-1"));
output.flush();
// Make sure the read times out if there are problems with the implementation
socket.setSoTimeout(1000);
InputStream input = socket.getInputStream();
BufferedReader reader = new BufferedReader(new InputStreamReader(input, "ISO-8859-1"));
String responseLine = reader.readLine();
assertTrue(responseLine.startsWith("HTTP/1.1 101 Web Socket Protocol Handshake"));
// Read until we find an empty line, which signals the end of the http response
String line;
while ((line = reader.readLine()) != null)
if (line.length() == 0)
break;
assertTrue(_serverWebSocket.awaitConnected(1000));
assertNotNull(_serverWebSocket.outbound);
// Server sends a big message
StringBuilder message = new StringBuilder();
String text = "0123456789ABCDEF";
for (int i = 0; i < 64 * 1024 / text.length(); ++i)
message.append(text);
byte[] data = message.toString().getBytes("UTF-8");
_serverWebSocket.outbound.sendMessage(data,0,data.length);
// Length of the message is 65536, so the length will be encoded as 0x84 0x80 0x00
int frame = input.read();
assertEquals(0x80, frame);
int length1 = input.read();
assertEquals(0x84, length1);
int length2 = input.read();
assertEquals(0x80, length2);
int length3 = input.read();
assertEquals(0x00, length3);
int read = 0;
while (read < data.length)
{
int b = input.read();
assertTrue(b != -1);
++read;
}
}
private static class TestWebSocket implements WebSocket
{
private final CountDownLatch latch = new CountDownLatch(1);
private volatile Connection outbound;
public void onConnect(Connection outbound)
{
this.outbound = outbound;
latch.countDown();
}
private boolean awaitConnected(long time) throws InterruptedException
{
return latch.await(time, TimeUnit.MILLISECONDS);
}
public void onDisconnect(int code,String data)
{
}
}
}

View File

@ -1,190 +0,0 @@
package org.eclipse.jetty.websocket;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.http.HttpHeaderValues;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.BufferCache.CachedBuffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.Utf8StringBuilder;
import org.junit.Before;
import org.junit.Test;
/**
* @version $Revision$ $Date$
*/
public class WebSocketParserD01Test
{
private ByteArrayBuffer _in;
private Handler _handler;
private WebSocketParser _parser;
@Before
public void setUp() throws Exception
{
WebSocketBuffers buffers = new WebSocketBuffers(1024);
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
_handler = new Handler();
_parser=new WebSocketParserD01(buffers, endPoint,_handler);
_in = new ByteArrayBuffer(2048);
endPoint.setIn(_in);
}
@Test
public void testCache() throws Exception
{
assertEquals(HttpHeaderValues.UPGRADE_ORDINAL ,((CachedBuffer)HttpHeaderValues.CACHE.lookup("Upgrade")).getOrdinal());
}
@Test
public void testShortText() throws Exception
{
_in.put((byte)0x00);
_in.put((byte)11);
_in.put("Hello World".getBytes(StringUtil.__UTF8));
int filled =_parser.parseNext();
assertEquals(13,filled);
assertEquals("Hello World",_handler._data.get(0));
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
@Test
public void testShortUtf8() throws Exception
{
String string = "Hell\uFF4f W\uFF4Frld";
byte[] bytes = string.getBytes("UTF-8");
_in.put((byte)0x00);
_in.put((byte)bytes.length);
_in.put(bytes);
int filled =_parser.parseNext();
assertEquals(bytes.length+2,filled);
assertEquals(string,_handler._data.get(0));
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
@Test
public void testMediumText() throws Exception
{
String string = "Hell\uFF4f Medium W\uFF4Frld ";
for (int i=0;i<4;i++)
string = string+string;
string += ". The end.";
byte[] bytes = string.getBytes("UTF-8");
_in.put((byte)0x00);
_in.put((byte)0x7E);
_in.put((byte)(bytes.length>>8));
_in.put((byte)(bytes.length&0xff));
_in.put(bytes);
int filled =_parser.parseNext();
assertEquals(bytes.length+4,filled);
assertEquals(string,_handler._data.get(0));
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
@Test
public void testLongText() throws Exception
{
WebSocketBuffers buffers = new WebSocketBuffers(0x20000);
ByteArrayEndPoint endPoint = new ByteArrayEndPoint();
WebSocketParser parser=new WebSocketParserD01(buffers, endPoint,_handler);
ByteArrayBuffer in = new ByteArrayBuffer(0x20000);
endPoint.setIn(in);
String string = "Hell\uFF4f Big W\uFF4Frld ";
for (int i=0;i<12;i++)
string = string+string;
string += ". The end.";
byte[] bytes = string.getBytes("UTF-8");
in.put((byte)0x00);
in.put((byte)0x7F);
in.put((byte)0x00);
in.put((byte)0x00);
in.put((byte)0x00);
in.put((byte)0x00);
in.put((byte)0x00);
in.put((byte)(bytes.length>>16));
in.put((byte)((bytes.length>>8)&0xff));
in.put((byte)(bytes.length&0xff));
in.put(bytes);
int filled =parser.parseNext();
assertEquals(bytes.length+10,filled);
assertEquals(string,_handler._data.get(0));
assertTrue(parser.isBufferEmpty());
assertTrue(parser.getBuffer()==null);
}
@Test
public void testShortFragmentTest() throws Exception
{
_in.put((byte)0x80);
_in.put((byte)0x06);
_in.put("Hello ".getBytes(StringUtil.__UTF8));
_in.put((byte)0x00);
_in.put((byte)0x05);
_in.put("World".getBytes(StringUtil.__UTF8));
int filled =_parser.parseNext();
assertEquals(15,filled);
assertEquals(0,_handler._data.size());
assertFalse(_parser.isBufferEmpty());
assertFalse(_parser.getBuffer()==null);
filled =_parser.parseNext();
assertEquals(0,filled);
assertEquals("Hello World",_handler._data.get(0));
assertTrue(_parser.isBufferEmpty());
assertTrue(_parser.getBuffer()==null);
}
private class Handler implements WebSocketParser.FrameHandler
{
Utf8StringBuilder _utf8 = new Utf8StringBuilder();
public List<String> _data = new ArrayList<String>();
public void onFrame(byte flags, byte opcode, Buffer buffer)
{
if ((flags&0x8)!=0)
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
else if (_utf8.length()==0)
_data.add(opcode,buffer.toString("utf-8"));
else
{
_utf8.append(buffer.array(),buffer.getIndex(),buffer.length());
_data.add(opcode,_utf8.toString());
_utf8.reset();
}
}
public void close(int code,String message)
{
}
}
}

View File

@ -35,7 +35,7 @@ public class WebSocketChatServlet extends WebSocketServlet
{ {
Connection _connection; Connection _connection;
public void onConnect(Connection connection) public void onOpen(Connection connection)
{ {
// Log.info(this+" onConnect"); // Log.info(this+" onConnect");
_connection=connection; _connection=connection;
@ -50,7 +50,7 @@ public class WebSocketChatServlet extends WebSocketServlet
public void onMessage(String data) public void onMessage(String data)
{ {
if (data.indexOf("disconnect")>=0) if (data.indexOf("disconnect")>=0)
_connection.disconnect(0,""); _connection.disconnect();
else else
{ {
// Log.info(this+" onMessage: "+data); // Log.info(this+" onMessage: "+data);
@ -68,7 +68,7 @@ public class WebSocketChatServlet extends WebSocketServlet
} }
} }
public void onDisconnect(int code, String message) public void onClose(int code, String message)
{ {
// Log.info(this+" onDisconnect"); // Log.info(this+" onDisconnect");
_members.remove(this); _members.remove(this);