294563 Initial websocket implementation working

git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@1043 7e9141cc-0065-0410-87d8-b60c137991c4
This commit is contained in:
Greg Wilkins 2009-11-12 03:27:35 +00:00
parent 1da3065043
commit 2a3750fe1e
31 changed files with 979 additions and 224 deletions

View File

@ -74,6 +74,11 @@ public class HttpConnection implements Connection
_parser = new HttpParser(responseBuffers,endp,new Handler());
}
public long getTimeStamp()
{
return -1;
}
public void setReserved (boolean reserved)
{
_reserved = reserved;

View File

@ -28,6 +28,7 @@ import org.eclipse.jetty.http.HttpVersions;
import org.eclipse.jetty.http.ssl.SslSelectChannelEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.ThreadLocalBuffers;
import org.eclipse.jetty.io.nio.DirectNIOBuffer;
@ -210,6 +211,11 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector,
protected void endPointClosed(SelectChannelEndPoint endpoint)
{
}
@Override
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
{
}
@Override
protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint)

View File

@ -450,12 +450,15 @@ public class HttpGenerator extends AbstractGenerator
if (_buffer!=null)
_buffer.clear();
// end the header.
_header.put(HttpTokens.CRLF);
_state = STATE_CONTENT;
return;
}
if (_status==204 || _status==304)
if (_status!=101 )
{
_header.put(HttpTokens.CRLF);
_state = STATE_CONTENT;
return;
}
}
else if (_status==204 || _status==304)
{
_noContent=true;
_content=null;

View File

@ -22,7 +22,7 @@ import java.io.IOException;
*
*
*/
public class ByteArrayEndPoint implements EndPoint
public class ByteArrayEndPoint implements ConnectedEndPoint
{
byte[] _inBytes;
ByteArrayBuffer _in;
@ -30,6 +30,7 @@ public class ByteArrayEndPoint implements EndPoint
boolean _closed;
boolean _nonBlocking;
boolean _growOutput;
Connection _connection;
/* ------------------------------------------------------------ */
/**
@ -39,6 +40,24 @@ public class ByteArrayEndPoint implements EndPoint
{
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.io.ConnectedEndPoint#getConnection()
*/
public Connection getConnection()
{
return _connection;
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.io.ConnectedEndPoint#setConnection(org.eclipse.jetty.io.Connection)
*/
public void setConnection(Connection connection)
{
_connection=connection;
}
/* ------------------------------------------------------------ */
/**
* @return the nonBlocking

View File

@ -0,0 +1,7 @@
package org.eclipse.jetty.io;
public interface ConnectedEndPoint extends EndPoint
{
Connection getConnection();
void setConnection(Connection connection);
}

View File

@ -17,7 +17,9 @@ import java.io.IOException;
public interface Connection
{
void handle() throws IOException;
void handle() throws IOException, UpgradeConnectionException;
long getTimeStamp();
boolean isIdle();
boolean isSuspended();

View File

@ -0,0 +1,26 @@
package org.eclipse.jetty.io;
/* ------------------------------------------------------------ */
/** Upgrade Connection Exception
* This exception is thrown when processing a protocol upgrade
* to exit all the current connection handling and to
* allow the {@link ConnectedEndPoint} to handle the new exception.
*
* Code that calls {@link org.eclipse.jetty.io.Connection#handle()}
* should catch this exception and call {@link ConnectedEndPoint#setConnection(org.eclipse.jetty.io.Connection)}
* with the new connection and then immediately call handle() again.
*/
public class UpgradeConnectionException extends RuntimeException
{
Connection _connection;
public UpgradeConnectionException(Connection newConnection)
{
_connection=newConnection;
}
public Connection getConnection()
{
return _connection;
}
}

View File

@ -21,8 +21,10 @@ import java.nio.channels.SocketChannel;
import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.UpgradeConnectionException;
import org.eclipse.jetty.io.nio.SelectorManager.SelectSet;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.thread.Timeout;
@ -34,7 +36,7 @@ import org.eclipse.jetty.util.thread.Timeout;
*
*
*/
public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, AsyncEndPoint
public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, AsyncEndPoint, ConnectedEndPoint
{
private final SelectorManager.SelectSet _selectSet;
private final SelectorManager _manager;
@ -66,12 +68,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
_key = key;
scheduleIdle();
}
/* ------------------------------------------------------------ */
public Connection getConnection()
{
return _connection;
}
/* ------------------------------------------------------------ */
public SelectorManager getSelectManager()
@ -79,10 +76,18 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
return _manager;
}
/* ------------------------------------------------------------ */
public Connection getConnection()
{
return _connection;
}
/* ------------------------------------------------------------ */
public void setConnection(Connection connection)
{
Connection old=_connection;
_connection=connection;
_manager.endPointUpgraded(this,old);
}
/* ------------------------------------------------------------ */
@ -217,7 +222,14 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException
{
int l = super.flush(header, buffer, trailer);
_writable = l!=0;
if (!(_writable=l!=0))
{
synchronized (this)
{
if (!_dispatched)
updateKey();
}
}
return l;
}
@ -228,7 +240,14 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
public int flush(Buffer buffer) throws IOException
{
int l = super.flush(buffer);
_writable = l!=0;
if (!(_writable=l!=0))
{
synchronized (this)
{
if (!_dispatched)
updateKey();
}
}
return l;
}
@ -445,6 +464,13 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable,
{
_connection.handle();
}
catch (UpgradeConnectionException e)
{
Log.debug(e.toString());
Log.ignore(e);
setConnection(e.getConnection());
continue;
}
catch (ClosedChannelException e)
{
Log.ignore(e);

View File

@ -23,6 +23,7 @@ import java.nio.channels.SocketChannel;
import java.util.ArrayList;
import java.util.List;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.component.AbstractLifeCycle;
@ -234,6 +235,9 @@ public abstract class SelectorManager extends AbstractLifeCycle
*/
protected abstract void endPointOpened(SelectChannelEndPoint endpoint);
/* ------------------------------------------------------------ */
protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection);
/* ------------------------------------------------------------------------------- */
protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint);

View File

@ -26,6 +26,7 @@ import org.eclipse.jetty.http.HttpHeaders;
import org.eclipse.jetty.http.HttpSchemes;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.util.component.LifeCycle;
@ -964,7 +965,7 @@ public abstract class AbstractConnector extends HttpBuffers implements Connector
}
/* ------------------------------------------------------------ */
protected void connectionOpened(HttpConnection connection)
protected void connectionOpened(Connection connection)
{
if (_statsStartedAt==-1)
return;
@ -977,13 +978,24 @@ public abstract class AbstractConnector extends HttpBuffers implements Connector
}
/* ------------------------------------------------------------ */
protected void connectionClosed(HttpConnection connection)
protected void connectionUpgraded(Connection oldConnection,Connection newConnection)
{
int requests=(oldConnection instanceof HttpConnection)?((HttpConnection)oldConnection).getRequests():0;
synchronized(_statsLock)
{
_requests+=requests;
}
}
/* ------------------------------------------------------------ */
protected void connectionClosed(Connection connection)
{
if (_statsStartedAt>=0)
{
long duration=System.currentTimeMillis()-connection.getTimeStamp();
int requests=connection.getRequests();
int requests=(connection instanceof HttpConnection)?((HttpConnection)connection).getRequests():0;
synchronized(_statsLock)
{

View File

@ -45,6 +45,7 @@ import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.io.UncheckedPrintWriter;
import org.eclipse.jetty.io.UpgradeConnectionException;
import org.eclipse.jetty.io.BufferCache.CachedBuffer;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
import org.eclipse.jetty.util.QuotedStringTokenizer;
@ -563,6 +564,10 @@ public class HttpConnection implements Connection
server.handleAsync(this);
}
}
catch (UpgradeConnectionException e)
{
throw e;
}
catch (ContinuationThrowable e)
{
Log.ignore(e);

View File

@ -20,7 +20,10 @@ import java.util.concurrent.LinkedBlockingQueue;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.UpgradeConnectionException;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
public class LocalConnector extends AbstractConnector
{
@ -101,17 +104,41 @@ public class LocalConnector extends AbstractConnector
public void run()
{
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(requestsBuffer.asArray(), 1024);
ByteArrayEndPoint endPoint = new ByteArrayEndPoint(requestsBuffer.asArray(), 1024)
{
@Override
public void setConnection(Connection connection)
{
connectionUpgraded(getConnection(),connection);
super.setConnection(connection);
}
};
endPoint.setGrowOutput(true);
HttpConnection connection = new HttpConnection(LocalConnector.this, endPoint, getServer());
endPoint.setConnection(connection);
connectionOpened(connection);
boolean leaveOpen = keepOpen;
try
{
while (endPoint.getIn().length() > 0)
connection.handle();
{
while(true)
{
try
{
endPoint.getConnection().handle();
break;
}
catch (UpgradeConnectionException e)
{
Log.debug(e.toString());
Log.ignore(e);
endPoint.setConnection(e.getConnection());
}
}
}
}
catch (Exception x)
{

View File

@ -23,8 +23,11 @@ import java.util.Set;
import org.eclipse.jetty.http.HttpException;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.UpgradeConnectionException;
import org.eclipse.jetty.io.bio.SocketEndPoint;
import org.eclipse.jetty.server.AbstractConnector;
import org.eclipse.jetty.server.HttpConnection;
@ -47,7 +50,7 @@ import org.eclipse.jetty.util.log.Log;
public class SocketConnector extends AbstractConnector
{
protected ServerSocket _serverSocket;
protected final Set _connections;
protected final Set<EndPoint> _connections;
/* ------------------------------------------------------------ */
/** Constructor.
@ -55,7 +58,7 @@ public class SocketConnector extends AbstractConnector
*/
public SocketConnector()
{
_connections=new HashSet();
_connections=new HashSet<EndPoint>();
}
/* ------------------------------------------------------------ */
@ -99,7 +102,7 @@ public class SocketConnector extends AbstractConnector
Socket socket = _serverSocket.accept();
configure(socket);
Connection connection=new Connection(socket);
ConnectorEndPoint connection=new ConnectorEndPoint(socket);
connection.dispatch();
}
@ -117,7 +120,7 @@ public class SocketConnector extends AbstractConnector
public void customize(EndPoint endpoint, Request request)
throws IOException
{
Connection connection = (Connection)endpoint;
ConnectorEndPoint connection = (ConnectorEndPoint)endpoint;
int lrmit = isLowResources()?_lowResourceMaxIdleTime:_maxIdleTime;
if (connection._sotimeout!=lrmit)
{
@ -159,7 +162,7 @@ public class SocketConnector extends AbstractConnector
Iterator iter=set.iterator();
while(iter.hasNext())
{
Connection connection = (Connection)iter.next();
ConnectorEndPoint connection = (ConnectorEndPoint)iter.next();
connection.close();
}
}
@ -167,20 +170,32 @@ public class SocketConnector extends AbstractConnector
/* ------------------------------------------------------------------------------- */
/* ------------------------------------------------------------------------------- */
/* ------------------------------------------------------------------------------- */
protected class Connection extends SocketEndPoint implements Runnable
protected class ConnectorEndPoint extends SocketEndPoint implements Runnable, ConnectedEndPoint
{
boolean _dispatched=false;
HttpConnection _connection;
volatile Connection _connection;
int _sotimeout;
protected Socket _socket;
protected final Socket _socket;
public Connection(Socket socket) throws IOException
public ConnectorEndPoint(Socket socket) throws IOException
{
super(socket);
_connection = newHttpConnection(this);
_sotimeout=socket.getSoTimeout();
_socket=socket;
}
public Connection getConnection()
{
return _connection;
}
public void setConnection(Connection connection)
{
if (_connection!=connection)
connectionUpgraded(_connection,connection);
_connection=connection;
}
public void dispatch() throws IOException
{
@ -203,7 +218,8 @@ public class SocketConnector extends AbstractConnector
@Override
public void close() throws IOException
{
_connection.getRequest().getAsyncContinuation().cancel();
if (_connection instanceof HttpConnection)
((HttpConnection)_connection).getRequest().getAsyncContinuation().cancel();
super.close();
}
@ -231,7 +247,17 @@ public class SocketConnector extends AbstractConnector
}
}
}
_connection.handle();
try
{
_connection.handle();
}
catch (UpgradeConnectionException e)
{
Log.debug(e.toString());
Log.ignore(e);
setConnection(e.getConnection());
continue;
}
}
}
catch (EofException e)

View File

@ -21,8 +21,11 @@ import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.http.HttpException;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.UpgradeConnectionException;
import org.eclipse.jetty.io.nio.ChannelEndPoint;
import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.Request;
@ -89,7 +92,7 @@ public class BlockingChannelConnector extends AbstractNIOConnector
Socket socket=channel.socket();
configure(socket);
Connection connection=new Connection(channel);
ConnectorEndPoint connection=new ConnectorEndPoint(channel);
connection.dispatch();
}
@ -98,7 +101,7 @@ public class BlockingChannelConnector extends AbstractNIOConnector
public void customize(EndPoint endpoint, Request request)
throws IOException
{
Connection connection = (Connection)endpoint;
ConnectorEndPoint connection = (ConnectorEndPoint)endpoint;
if (connection._sotimeout!=_maxIdleTime)
{
connection._sotimeout=_maxIdleTime;
@ -121,24 +124,41 @@ public class BlockingChannelConnector extends AbstractNIOConnector
/* ------------------------------------------------------------------------------- */
/* ------------------------------------------------------------------------------- */
/* ------------------------------------------------------------------------------- */
private class Connection extends ChannelEndPoint implements Runnable
private class ConnectorEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint
{
final HttpConnection _connection;
Connection _connection;
boolean _dispatched=false;
int _sotimeout;
Connection(ByteChannel channel)
ConnectorEndPoint(ByteChannel channel)
{
super(channel);
_connection = new HttpConnection(BlockingChannelConnector.this,this,getServer());
}
/* ------------------------------------------------------------ */
/** Get the connection.
* @return the connection
*/
public Connection getConnection()
{
return _connection;
}
/* ------------------------------------------------------------ */
public void setConnection(Connection connection)
{
_connection=connection;
}
/* ------------------------------------------------------------ */
void dispatch() throws IOException
{
if (!getThreadPool().dispatch(this))
{
Log.warn("dispatch failed for {}",_connection);
Connection.this.close();
ConnectorEndPoint.this.close();
}
}
@ -162,25 +182,35 @@ public class BlockingChannelConnector extends AbstractNIOConnector
}
}
}
_connection.handle();
try
{
_connection.handle();
}
catch (UpgradeConnectionException e)
{
Log.debug(e.toString());
Log.ignore(e);
setConnection(e.getConnection());
continue;
}
}
}
catch (EofException e)
{
Log.debug("EOF", e);
try{Connection.this.close();}
try{ConnectorEndPoint.this.close();}
catch(IOException e2){Log.ignore(e2);}
}
catch (HttpException e)
{
Log.debug("BAD", e);
try{Connection.this.close();}
try{ConnectorEndPoint.this.close();}
catch(IOException e2){Log.ignore(e2);}
}
catch(Throwable e)
{
Log.warn("handle failed",e);
try{Connection.this.close();}
try{ConnectorEndPoint.this.close();}
catch(IOException e2){Log.ignore(e2);}
}
finally

View File

@ -20,6 +20,7 @@ import java.nio.channels.SelectionKey;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.io.nio.SelectChannelEndPoint;
@ -89,14 +90,20 @@ public class SelectChannelConnector extends AbstractNIOConnector
@Override
protected void endPointClosed(final SelectChannelEndPoint endpoint)
{
connectionClosed((HttpConnection)endpoint.getConnection());
connectionClosed(endpoint.getConnection());
}
@Override
protected void endPointOpened(SelectChannelEndPoint endpoint)
{
// TODO handle max connections and low resources
connectionOpened((HttpConnection)endpoint.getConnection());
connectionOpened(endpoint.getConnection());
}
@Override
protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection)
{
connectionUpgraded(oldConnection,endpoint.getConnection());
}
@Override

View File

@ -164,7 +164,7 @@ public class SslSocketConnector extends SocketConnector implements SslConnector
Socket socket = _serverSocket.accept();
configure(socket);
Connection connection=new SslConnection(socket);
ConnectorEndPoint connection=new SslConnection(socket);
connection.dispatch();
}
@ -670,7 +670,7 @@ public class SslSocketConnector extends SocketConnector implements SslConnector
/* ------------------------------------------------------------ */
public class SslConnection extends Connection
public class SslConnection extends ConnectorEndPoint
{
public SslConnection(Socket socket) throws IOException
{

View File

@ -39,6 +39,7 @@ import org.eclipse.jetty.http.HttpException;
import org.eclipse.jetty.http.PathMap;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.io.UpgradeConnectionException;
import org.eclipse.jetty.security.IdentityService;
import org.eclipse.jetty.security.SecurityHandler;
import org.eclipse.jetty.server.Dispatcher;
@ -438,6 +439,10 @@ public class ServletHandler extends ScopedHandler
{
throw e;
}
catch(UpgradeConnectionException e)
{
throw e;
}
catch(Exception e)
{
if (!(DispatcherType.REQUEST.equals(type) || DispatcherType.ASYNC.equals(type)))

View File

@ -337,6 +337,24 @@ public class StringUtil
return buf.toString();
}
/* ------------------------------------------------------------ */
public static String printable(byte[] b)
{
StringBuilder buf = new StringBuilder();
for (int i=0;i<b.length;i++)
{
char c=(char)b[i];
if (Character.isWhitespace(c)|| c>' ' && c<0x7f)
buf.append(c);
else
{
buf.append("0x");
TypeUtil.toHex(b[i],buf);
}
}
return buf.toString();
}
public static byte[] getBytes(String s)
{
try

View File

@ -447,6 +447,20 @@ public class TypeUtil
return 0;
}
/* ------------------------------------------------------------ */
public static void toHex(byte b,StringBuilder buf)
{
int bi=0xff&b;
int c='0'+(bi/16)%16;
if (c>'9')
c= 'A'+(c-'0'-10);
buf.append((char)c);
c='0'+bi%16;
if (c>'9')
c= 'A'+(c-'0'-10);
buf.append((char)c);
}
/* ------------------------------------------------------------ */
public static String toHexString(byte[] b)
{

View File

@ -95,12 +95,14 @@ public class WebAppContext extends ServletContextHandler
"org.eclipse.jetty.jndi.", // webapp cannot change naming classes
"org.eclipse.jetty.plus.jaas.", // webapp cannot change jetty jaas classes
"org.eclipse.jetty.servlet.DefaultServlet", // webapp cannot change default servlets
"org.eclipse.jetty.websocket.", // WebSocket is a jetty extension
};
private String[] _serverClasses = {
"-org.eclipse.jetty.continuation.", // don't hide continuation classes
"-org.eclipse.jetty.jndi.", // don't hide naming classes
"-org.eclipse.jetty.plus.jaas.", // don't hide jaas modules
"-org.eclipse.jetty.servlet.DefaultServlet", // webapp cannot change default servlets
"-org.eclipse.jetty.websocket.", // don't hide websocket extension
"org.eclipse.jetty." // hide rest of jetty classes
};
private File _tmpDir;

View File

@ -11,7 +11,7 @@
<dependencies>
<dependency>
<groupId>org.eclipse.jetty</groupId>
<artifactId>jetty-io</artifactId>
<artifactId>jetty-server</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
@ -19,6 +19,11 @@
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
<type>jar</type>
</dependency>
</dependencies>
<build>
<plugins>

View File

@ -0,0 +1,21 @@
package org.eclipse.jetty.websocket;
import java.io.IOException;
public interface WebSocket
{
public final byte LENGTH_FRAME=(byte)0x80;
public final byte SENTINEL_FRAME=(byte)0x00;
void onConnect(Outbound outbound);
void onMessage(byte frame,String data);
void onMessage(byte frame,byte[] data, int offset, int length);
void onDisconnect();
public interface Outbound
{
void sendMessage(byte frame,String data) throws IOException;
void sendMessage(byte frame,byte[] data) throws IOException;
void sendMessage(byte frame,byte[] data, int offset, int length) throws IOException;
void disconnect() throws IOException;
}
}

View File

@ -0,0 +1,59 @@
package org.eclipse.jetty.websocket;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ThreadLocalBuffers;
import org.eclipse.jetty.io.nio.DirectNIOBuffer;
/* ------------------------------------------------------------ */
/** The WebSocket Buffer Pool.
*
* The normal buffers are byte array buffers so that user processes
* can access directly. However the generator uses direct buffers
* for the final output stage as they are filled in bulk and are more
* effecient to flush.
*/
public class WebSocketBuffers
{
final private ThreadLocalBuffers _buffers;
public WebSocketBuffers(final int bufferSize)
{
_buffers = new ThreadLocalBuffers()
{
@Override
protected Buffer newHeader(int size)
{
return new DirectNIOBuffer(bufferSize);
}
@Override
protected Buffer newBuffer(int size)
{
return new ByteArrayBuffer(bufferSize);
}
@Override
protected boolean isHeader(Buffer buffer)
{
return buffer instanceof DirectNIOBuffer;
}
};
}
public Buffer getBuffer()
{
return _buffers.getBuffer();
}
public Buffer getDirectBuffer()
{
return _buffers.getHeader();
}
public void returnBuffer(Buffer buffer)
{
_buffers.returnBuffer(buffer);
}
}

View File

@ -2,13 +2,64 @@ package org.eclipse.jetty.websocket;
import java.io.IOException;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Connection;
import org.eclipse.jetty.io.EndPoint;
import org.eclipse.jetty.util.log.Log;
public class WebSocketConnection implements Connection
public class WebSocketConnection implements Connection, WebSocket.Outbound
{
WebSocketParser _parser;
WebSocketGenerator _generator;
final EndPoint _endp;
final WebSocketParser _parser;
final WebSocketGenerator _generator;
final long _timestamp;
final WebSocket _websocket;
final int _maxIdleTimeMs=30000;
public WebSocketConnection(WebSocketBuffers buffers, EndPoint endpoint, long timestamp, WebSocket websocket)
{
_endp = endpoint;
_timestamp = timestamp;
_websocket = websocket;
_generator = new WebSocketGenerator(buffers, _endp);
_parser = new WebSocketParser(buffers, endpoint, new WebSocketParser.EventHandler()
{
public void onFrame(byte frame, String data)
{
try
{
_websocket.onMessage(frame,data);
}
catch(ThreadDeath th)
{
throw th;
}
catch(Throwable th)
{
Log.warn(th);
}
}
public void onFrame(byte frame, Buffer buffer)
{
try
{
byte[] array=buffer.array();
_websocket.onMessage(frame,array,buffer.getIndex(),buffer.length());
}
catch(ThreadDeath th)
{
throw th;
}
catch(Throwable th)
{
Log.warn(th);
}
}
});
}
public void handle() throws IOException
{
boolean more=true;
@ -18,7 +69,7 @@ public class WebSocketConnection implements Connection
int flushed=_generator.flush();
int filled=_parser.parseNext();
more = flushed>0 || filled>0 || !_parser.isBufferEmpty();
more = flushed>0 || filled>0 || !_parser.isBufferEmpty() || !_generator.isBufferEmpty();
}
}
@ -32,4 +83,38 @@ public class WebSocketConnection implements Connection
return false;
}
public long getTimeStamp()
{
return _timestamp;
}
public void sendMessage(byte frame, String content) throws IOException
{
_generator.addFrame(frame,content,_maxIdleTimeMs);
_generator.flush();
}
public void sendMessage(byte frame, byte[] content) throws IOException
{
_generator.addFrame(frame,content,_maxIdleTimeMs);
_generator.flush();
}
public void sendMessage(byte frame, byte[] content, int offset, int length) throws IOException
{
_generator.addFrame(frame,content,offset,length,_maxIdleTimeMs);
_generator.flush();
}
public void disconnect() throws IOException
{
_generator.flush(_maxIdleTimeMs);
_endp.close();
}
public void fill(Buffer buffer)
{
_parser.fill(buffer);
}
}

View File

@ -3,7 +3,6 @@ package org.eclipse.jetty.websocket;
import java.io.IOException;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.EndPoint;
@ -16,157 +15,173 @@ import org.eclipse.jetty.io.EndPoint;
*/
public class WebSocketGenerator
{
final private Buffers _buffers;
final private WebSocketBuffers _buffers;
final private EndPoint _endp;
private Buffer _buffer;
public WebSocketGenerator(Buffers buffers, EndPoint endp)
public WebSocketGenerator(WebSocketBuffers buffers, EndPoint endp)
{
_buffers=buffers;
_endp=endp;
}
synchronized public boolean addMessage(byte frame,Buffer content, long blockFor) throws IOException
synchronized public void addFrame(byte frame,byte[] content, int blockFor) throws IOException
{
if (_buffer==null)
_buffer=_buffers.getBuffer();
else if (_buffer.length()>0)
flushBuffer();
int length=content.length();
if (length>2097152)
throw new IllegalArgumentException("too big");
int length_bytes=(length>16384)?3:(length>128)?2:1;
if (_buffer.space()<length+1+length_bytes)
{
// TODO block if there can be space
throw new IllegalArgumentException("no space");
}
_buffer.put((byte)(0x80|frame));
switch (length_bytes)
{
case 3:
_buffer.put((byte)(0x80|(length>>14)));
case 2:
_buffer.put((byte)(0x80|(0x7f&(length>>7))));
case 1:
_buffer.put((byte)(0x7f&length));
}
_buffer.put(content);
return true;
}
synchronized public boolean addMessage(byte frame, String content, long blockFor) throws IOException
{
if (_buffer==null)
_buffer=_buffers.getBuffer();
else if (_buffer.length()>0)
flushBuffer();
int length=content.length();
int space=waitForSpace(length+2,blockFor);
_buffer.put((byte)(0x7f&frame));
for (int i = 0; i < length; i++)
{
int code = content.charAt(i);
if ((code & 0xffffff80) == 0)
{
// 1b
if (space<1)
space=waitForSpace(1,blockFor);
_buffer.put((byte)(code));
space--;
}
else if((code&0xfffff800)==0)
{
// 2b
if (space<2)
space=waitForSpace(2,blockFor);
_buffer.put((byte)(0xc0|(code>>6)));
_buffer.put((byte)(0x80|(code&0x3f)));
space-=2;
}
else if((code&0xffff0000)==0)
{
// 3b
if (space<3)
space=waitForSpace(3,blockFor);
_buffer.put((byte)(0xe0|(code>>12)));
_buffer.put((byte)(0x80|((code>>6)&0x3f)));
_buffer.put((byte)(0x80|(code&0x3f)));
space-=3;
}
else if((code&0xff200000)==0)
{
// 4b
if (space<4)
space=waitForSpace(4,blockFor);
_buffer.put((byte)(0xf0|(code>>18)));
_buffer.put((byte)(0x80|((code>>12)&0x3f)));
_buffer.put((byte)(0x80|((code>>6)&0x3f)));
_buffer.put((byte)(0x80|(code&0x3f)));
space-=4;
}
else if((code&0xf4000000)==0)
{
// 5b
if (space<5)
space=waitForSpace(5,blockFor);
_buffer.put((byte)(0xf8|(code>>24)));
_buffer.put((byte)(0x80|((code>>18)&0x3f)));
_buffer.put((byte)(0x80|((code>>12)&0x3f)));
_buffer.put((byte)(0x80|((code>>6)&0x3f)));
_buffer.put((byte)(0x80|(code&0x3f)));
space-=5;
}
else if((code&0x80000000)==0)
{
// 6b
if (space<6)
space=waitForSpace(6,blockFor);
_buffer.put((byte)(0xfc|(code>>30)));
_buffer.put((byte)(0x80|((code>>24)&0x3f)));
_buffer.put((byte)(0x80|((code>>18)&0x3f)));
_buffer.put((byte)(0x80|((code>>12)&0x3f)));
_buffer.put((byte)(0x80|((code>>6)&0x3f)));
_buffer.put((byte)(0x80|(code&0x3f)));
space-=6;
}
else
{
_buffer.put((byte)('?'));
space-=1;
}
}
if (space<1)
space=waitForSpace(1,blockFor);
_buffer.put((byte)(0xff));
return true;
addFrame(frame,content,0,content.length,blockFor);
}
private int waitForSpace(int needed, long blockFor)
synchronized public void addFrame(byte frame,byte[] content, int offset, int length, int blockFor) throws IOException
{
if (_buffer==null)
_buffer=_buffers.getDirectBuffer();
if ((frame&0x80)==0x80)
{
// Send in a length delimited frame
// maximum of 3 byte length == 21 bits
if (length>2097152)
throw new IllegalArgumentException("too big");
int length_bytes=(length>16384)?3:(length>128)?2:1;
int needed=length+1+length_bytes;
checkSpace(needed,blockFor);
_buffer.put(frame);
switch (length_bytes)
{
case 3:
_buffer.put((byte)(0x80|(length>>14)));
case 2:
_buffer.put((byte)(0x80|(0x7f&(length>>7))));
case 1:
_buffer.put((byte)(0x7f&length));
}
_buffer.put(content,offset,length);
}
else
{
// send in a sentinel frame
int needed=length+2;
checkSpace(needed,blockFor);
_buffer.put(frame);
_buffer.put(content,offset,length);
_buffer.put((byte)0xFF);
}
}
synchronized public void addFrame(byte frame, String content, int blockFor) throws IOException
{
Buffer byte_buffer=_buffers.getBuffer();
try
{
byte[] array=byte_buffer.array();
int chars = content.length();
int bytes = 0;
final int limit=array.length-6;
for (int i = 0; i < chars; i++)
{
int code = content.charAt(i);
if (bytes>=limit)
throw new IllegalArgumentException("frame too large");
if ((code & 0xffffff80) == 0)
{
array[bytes++]=(byte)(code);
}
else if((code&0xfffff800)==0)
{
array[bytes++]=(byte)(0xc0|(code>>6));
array[bytes++]=(byte)(0x80|(code&0x3f));
}
else if((code&0xffff0000)==0)
{
array[bytes++]=(byte)(0xe0|(code>>12));
array[bytes++]=(byte)(0x80|((code>>6)&0x3f));
array[bytes++]=(byte)(0x80|(code&0x3f));
}
else if((code&0xff200000)==0)
{
array[bytes++]=(byte)(0xf0|(code>>18));
array[bytes++]=(byte)(0x80|((code>>12)&0x3f));
array[bytes++]=(byte)(0x80|((code>>6)&0x3f));
array[bytes++]=(byte)(0x80|(code&0x3f));
}
else if((code&0xf4000000)==0)
{
array[bytes++]=(byte)(0xf8|(code>>24));
array[bytes++]=(byte)(0x80|((code>>18)&0x3f));
array[bytes++]=(byte)(0x80|((code>>12)&0x3f));
array[bytes++]=(byte)(0x80|((code>>6)&0x3f));
array[bytes++]=(byte)(0x80|(code&0x3f));
}
else if((code&0x80000000)==0)
{
array[bytes++]=(byte)(0xfc|(code>>30));
array[bytes++]=(byte)(0x80|((code>>24)&0x3f));
array[bytes++]=(byte)(0x80|((code>>18)&0x3f));
array[bytes++]=(byte)(0x80|((code>>12)&0x3f));
array[bytes++]=(byte)(0x80|((code>>6)&0x3f));
array[bytes++]=(byte)(0x80|(code&0x3f));
}
else
{
array[bytes++]=(byte)('?');
}
}
addFrame(frame,array,0,bytes,blockFor);
}
finally
{
_buffers.returnBuffer(byte_buffer);
}
}
private void checkSpace(int needed, long blockFor)
throws IOException
{
int space=_buffer.space();
if (space<needed)
{
_buffer.compact();
space=_buffer.space();
if (_endp.isBlocking())
{
try
{
flushBuffer();
_buffer.compact();
space=_buffer.space();
}
catch(IOException e)
{
throw e;
}
}
else
{
flushBuffer();
_buffer.compact();
space=_buffer.space();
if (space<needed && _buffer.length()>0 && _endp.blockWritable(blockFor))
{
flushBuffer();
_buffer.compact();
space=_buffer.space();
}
}
if (space<needed)
// TODO flush and wait for space
throw new IllegalStateException("no space");
{
_endp.close();
throw new IOException("Full Timeout");
}
}
return space;
}
synchronized public int flush(long blockFor)
@ -177,7 +192,7 @@ public class WebSocketGenerator
synchronized public int flush() throws IOException
{
int flushed = flushBuffer();
if (_buffer.length()==0)
if (_buffer!=null && _buffer.length()==0)
{
_buffers.returnBuffer(_buffer);
_buffer=null;
@ -204,5 +219,4 @@ public class WebSocketGenerator
{
return _buffer==null || _buffer.length()==0;
}
}

View File

@ -0,0 +1,101 @@
package org.eclipse.jetty.websocket;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.UpgradeConnectionException;
import org.eclipse.jetty.server.HttpConnection;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.handler.HandlerWrapper;
public abstract class WebSocketHandler extends HandlerWrapper
{
private WebSocketBuffers _buffers = new WebSocketBuffers(8192);
private int _bufferSize=8192;
/* ------------------------------------------------------------ */
/** Get the bufferSize.
* @return the bufferSize
*/
public int getBufferSize()
{
return _bufferSize;
}
/* ------------------------------------------------------------ */
/** Set the bufferSize.
* @param bufferSize the bufferSize to set
*/
public void setBufferSize(int bufferSize)
{
_bufferSize = bufferSize;
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.server.handler.HandlerWrapper#doStart()
*/
@Override
protected void doStart() throws Exception
{
_buffers=new WebSocketBuffers(_bufferSize);
super.doStart();
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.server.handler.HandlerWrapper#doStop()
*/
@Override
protected void doStop() throws Exception
{
super.doStop();
_buffers=null;
}
/* ------------------------------------------------------------ */
@Override
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
{
if ("WebSocket".equals(request.getHeader("Upgrade")) &&
"HTTP/1.1".equals(request.getProtocol()))
{
WebSocket websocket=doWebSocketConnect(request,request.getHeader("WebSocket-Protocol"));
if (websocket!=null)
{
HttpConnection http = HttpConnection.getCurrentConnection();
ConnectedEndPoint endp = (ConnectedEndPoint)http.getEndPoint();
WebSocketConnection connection = new WebSocketConnection(_buffers,endp,http.getTimeStamp(),websocket);
response.setHeader("Upgrade","WebSocket");
response.addHeader("Connection","Upgrade");
response.sendError(101,"Web Socket Protocol Handshake");
response.flushBuffer();
connection.fill(((HttpParser)http.getParser()).getHeaderBuffer());
connection.fill(((HttpParser)http.getParser()).getBodyBuffer());
websocket.onConnect(connection);
throw new UpgradeConnectionException(connection);
}
else
{
response.sendError(503);
}
}
else
{
super.handle(target,baseRequest,request,response);
}
}
abstract protected WebSocket doWebSocketConnect(HttpServletRequest request,String protocol);
}

View File

@ -23,7 +23,7 @@ public class WebSocketParser
public static final int STATE_LENGTH=2;
public static final int STATE_DATA=3;
private final Buffers _buffers;
private final WebSocketBuffers _buffers;
private final EndPoint _endp;
private final EventHandler _handler;
private int _state;
@ -40,7 +40,7 @@ public class WebSocketParser
* @param endp
* @param handler
*/
public WebSocketParser(Buffers buffers, EndPoint endp, EventHandler handler)
public WebSocketParser(WebSocketBuffers buffers, EndPoint endp, EventHandler handler)
{
_buffers=buffers;
_endp=endp;
@ -134,7 +134,7 @@ public class WebSocketParser
String data=_utf8.toString();
_utf8.reset();
_state=STATE_START;
_handler.onUtf8Message(_frame,data);
_handler.onFrame(_frame,data);
_buffer.setMarkIndex(-1);
if (_buffer.length()==0)
{
@ -163,7 +163,7 @@ public class WebSocketParser
Buffer data=_buffer.sliceFromMark(_length);
_buffer.skip(_length);
_state=STATE_START;
_handler.onBinaryMessage(_frame,data);
_handler.onFrame(_frame,data);
if (_buffer.length()==0)
{
@ -177,12 +177,27 @@ public class WebSocketParser
}
}
/* ------------------------------------------------------------ */
public void fill(Buffer buffer)
{
if (buffer!=null && buffer.length()>0)
{
if (_buffer==null)
_buffer=_buffers.getBuffer();
_buffer.put(buffer);
buffer.clear();
}
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
public interface EventHandler
{
void onUtf8Message(byte frame,String data);
void onBinaryMessage(byte frame,Buffer buffer);
void onFrame(byte frame,String data);
void onFrame(byte frame,Buffer buffer);
}
}

View File

@ -0,0 +1,83 @@
package org.eclipse.jetty.websocket;
import java.io.IOException;
import javax.servlet.ServletException;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.io.ConnectedEndPoint;
import org.eclipse.jetty.io.UpgradeConnectionException;
import org.eclipse.jetty.server.HttpConnection;
/* ------------------------------------------------------------ */
/**
* Servlet to ugrade connections to WebSocket
* <p>
* The request must have the correct upgrade headers, else it is
* handled as a normal servlet request.
* <p>
* The initParameter "bufferSize" can be used to set the buffer size,
* which is also the max frame byte size (default 8192).
*/
public abstract class WebSocketServlet extends HttpServlet
{
WebSocketBuffers _buffers;
/* ------------------------------------------------------------ */
/**
* @see javax.servlet.GenericServlet#init()
*/
@Override
public void init() throws ServletException
{
String bs=getInitParameter("bufferSize");
_buffers = new WebSocketBuffers(bs==null?8192:Integer.parseInt(bs));
}
/* ------------------------------------------------------------ */
/**
* @see javax.servlet.http.HttpServlet#service(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse)
*/
@Override
protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException
{
if ("WebSocket".equals(request.getHeader("Upgrade")) &&
"HTTP/1.1".equals(request.getProtocol()))
{
WebSocket websocket=doWebSocketConnect(request,request.getHeader("WebSocket-Protocol"));
if (websocket!=null)
{
HttpConnection http = HttpConnection.getCurrentConnection();
ConnectedEndPoint endp = (ConnectedEndPoint)http.getEndPoint();
WebSocketConnection connection = new WebSocketConnection(_buffers,endp,http.getTimeStamp(),websocket);
response.setHeader("Upgrade","WebSocket");
response.addHeader("Connection","Upgrade");
response.sendError(101,"Web Socket Protocol Handshake");
response.flushBuffer();
connection.fill(((HttpParser)http.getParser()).getHeaderBuffer());
connection.fill(((HttpParser)http.getParser()).getBodyBuffer());
websocket.onConnect(connection);
throw new UpgradeConnectionException(connection);
}
else
{
response.sendError(503);
}
}
else
super.service(request,response);
}
abstract protected WebSocket doWebSocketConnect(HttpServletRequest request,String protocol);
}

View File

@ -1,17 +1,11 @@
package org.eclipse.jetty.websocket;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.io.SimpleBuffers;
import org.eclipse.jetty.util.StringUtil;
import junit.framework.TestCase;
/* ------------------------------------------------------------ */
/**
@ -19,7 +13,7 @@ import junit.framework.TestCase;
public class WebSocketGeneratorTest extends TestCase
{
Buffers _buffers;
WebSocketBuffers _buffers;
ByteArrayBuffer _out;
ByteArrayEndPoint _endp;
WebSocketGenerator _generator;
@ -28,17 +22,17 @@ public class WebSocketGeneratorTest extends TestCase
@Override
protected void setUp() throws Exception
{
_buffers=new SimpleBuffers(null,new ByteArrayBuffer(1024));
_buffers=new WebSocketBuffers(1024);
_endp = new ByteArrayEndPoint();
_generator = new WebSocketGenerator(_buffers,_endp);
_out = new ByteArrayBuffer(2048);
_endp.setOut(_out);
}
/* ------------------------------------------------------------ */
public void testOneString() throws Exception
{
_generator.addMessage((byte)0x04,"Hell\uFF4F W\uFF4Frld",0);
_generator.addFrame((byte)0x04,"Hell\uFF4F W\uFF4Frld",0);
_generator.flush();
assertEquals(4,_out.get());
assertEquals('H',_out.get());
@ -61,7 +55,7 @@ public class WebSocketGeneratorTest extends TestCase
public void testOneBuffer() throws Exception
{
_generator.addMessage((byte)0x04,new ByteArrayBuffer("Hell\uFF4F W\uFF4Frld",StringUtil.__UTF8),0);
_generator.addFrame((byte)0x84,"Hell\uFF4F W\uFF4Frld".getBytes(StringUtil.__UTF8),0);
_generator.flush();
assertEquals(0x84,0xff&_out.get());
assertEquals(15,0xff&_out.get());
@ -88,8 +82,7 @@ public class WebSocketGeneratorTest extends TestCase
for (int i=0;i<b.length;i++)
b[i]=(byte)('0'+(i%10));
_generator.addMessage((byte)0x05,new ByteArrayBuffer(b),0);
_generator.addFrame((byte)0x85,b,0);
_generator.flush();
assertEquals(0x85,0xff&_out.get());
@ -98,7 +91,4 @@ public class WebSocketGeneratorTest extends TestCase
for (int i=0;i<b.length;i++)
assertEquals('0'+(i%10),0xff&_out.get());
}
}

View File

@ -3,23 +3,20 @@ package org.eclipse.jetty.websocket;
import java.util.ArrayList;
import java.util.List;
import junit.framework.TestCase;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.Buffers;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.io.ByteArrayEndPoint;
import org.eclipse.jetty.io.SimpleBuffers;
import org.eclipse.jetty.util.StringUtil;
import junit.framework.TestCase;
/* ------------------------------------------------------------ */
/**
*/
public class WebSocketParserTest extends TestCase
{
Buffers _buffers;
{
WebSocketBuffers _buffers;
ByteArrayBuffer _in;
ByteArrayEndPoint _endp;
Handler _handler;
@ -29,7 +26,7 @@ public class WebSocketParserTest extends TestCase
@Override
protected void setUp() throws Exception
{
_buffers=new SimpleBuffers(null,new ByteArrayBuffer(1024));
_buffers=new WebSocketBuffers(1024);
_endp = new ByteArrayEndPoint();
_handler = new Handler();
_parser=new WebSocketParser(_buffers,_endp,_handler);
@ -139,12 +136,12 @@ public class WebSocketParserTest extends TestCase
{
public List<String> _data = new ArrayList<String>();
public void onBinaryMessage(byte frame, Buffer buffer)
public void onFrame(byte frame, Buffer buffer)
{
_data.add(buffer.toString(StringUtil.__UTF8));
}
public void onUtf8Message(byte frame, String data)
public void onFrame(byte frame, String data)
{
_data.add(data);
}

View File

@ -0,0 +1,141 @@
package org.eclipse.jetty.websocket;
import java.io.IOException;
import javax.servlet.http.HttpServletRequest;
import junit.framework.TestCase;
import org.eclipse.jetty.io.Buffer;
import org.eclipse.jetty.io.ByteArrayBuffer;
import org.eclipse.jetty.server.LocalConnector;
import org.eclipse.jetty.server.Server;
import org.eclipse.jetty.server.handler.DefaultHandler;
import org.eclipse.jetty.util.StringUtil;
import org.eclipse.jetty.util.log.Log;
public class WebSocketTest extends TestCase
{
TestWebSocket _websocket;
LocalConnector _connector;
Server _server;
WebSocketHandler _handler;
/* ------------------------------------------------------------ */
@Override
protected void setUp() throws Exception
{
_server = new Server();
_connector = new LocalConnector();
_server.addConnector(_connector);
_handler= new WebSocketHandler()
{
@Override
protected WebSocket doWebSocketConnect(HttpServletRequest request, String protocol)
{
_websocket = new TestWebSocket();
return _websocket;
}
};
_handler.setHandler(new DefaultHandler());
_server.setHandler(_handler);
_server.start();
}
public void testNoWebSocket() throws Exception
{
String response = _connector.getResponses(
"GET /foo HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"\r\n",false);
assertTrue(response.startsWith("HTTP/1.1 404 "));
}
/* ------------------------------------------------------------ */
public void testOpenWebSocket() throws Exception
{
String response = _connector.getResponses(
"GET /demo HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Upgrade: WebSocket\r\n" +
"Connection: Upgrade\r\n" +
"\r\n",false);
assertTrue(response.startsWith("HTTP/1.1 101 Web Socket Protocol Handshake"));
assertTrue(response.contains("Upgrade: WebSocket"));
assertTrue(response.contains("Connection: Upgrade"));
}
/* ------------------------------------------------------------ */
public void testSendReceiveUtf8WebSocket() throws Exception
{
ByteArrayBuffer buffer = new ByteArrayBuffer(1024);
buffer.put(
("GET /demo HTTP/1.1\r\n" +
"Host: localhost\r\n" +
"Upgrade: WebSocket\r\n" +
"Connection: Upgrade\r\n" +
"\r\n").getBytes(StringUtil.__ISO_8859_1));
buffer.put((byte)0);
buffer.put("Hello World".getBytes(StringUtil.__UTF8));
buffer.put((byte)0xFF);
ByteArrayBuffer out = _connector.getResponses(buffer,true);
String response = StringUtil.printable(out.asArray());
assertTrue(response.startsWith("HTTP/1.1 101 Web Socket Protocol Handshake"));
assertTrue(response.contains("Upgrade: WebSocket"));
assertTrue(response.contains("Connection: Upgrade"));
assertTrue(response.contains("0x00Roger That0xFF"));
assertEquals("Hello World",_websocket._utf8);
}
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
class TestWebSocket implements WebSocket
{
Outbound _outbound;
Buffer _binary;
String _utf8;
boolean _disconnected;
public void onConnect(Outbound outbound)
{
_outbound=outbound;
try
{
_outbound.sendMessage(SENTINEL_FRAME,"Roger That");
}
catch (IOException e)
{
Log.warn(e);
}
}
public void onMessage(byte frame, byte[] data,int offset, int length)
{
_binary=new ByteArrayBuffer(data,offset,length).duplicate(Buffer.READONLY);
}
public void onMessage(byte frame, String data)
{
_utf8=data;
}
public void onDisconnect()
{
_disconnected=true;
}
}
}