From 2a3750fe1e8a73e2d4427545d7123860d678e1a8 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 12 Nov 2009 03:27:35 +0000 Subject: [PATCH] 294563 Initial websocket implementation working git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@1043 7e9141cc-0065-0410-87d8-b60c137991c4 --- .../eclipse/jetty/client/HttpConnection.java | 5 + .../eclipse/jetty/client/SelectConnector.java | 6 + .../org/eclipse/jetty/http/HttpGenerator.java | 13 +- .../eclipse/jetty/io/ByteArrayEndPoint.java | 21 +- .../eclipse/jetty/io/ConnectedEndPoint.java | 7 + .../java/org/eclipse/jetty/io/Connection.java | 4 +- .../jetty/io/UpgradeConnectionException.java | 26 ++ .../jetty/io/nio/SelectChannelEndPoint.java | 44 ++- .../eclipse/jetty/io/nio/SelectorManager.java | 4 + .../jetty/server/AbstractConnector.java | 20 +- .../eclipse/jetty/server/HttpConnection.java | 5 + .../eclipse/jetty/server/LocalConnector.java | 35 ++- .../jetty/server/bio/SocketConnector.java | 48 ++- .../server/nio/BlockingChannelConnector.java | 50 ++- .../server/nio/SelectChannelConnector.java | 11 +- .../jetty/server/ssl/SslSocketConnector.java | 4 +- .../eclipse/jetty/servlet/ServletHandler.java | 5 + .../org/eclipse/jetty/util/StringUtil.java | 18 ++ .../java/org/eclipse/jetty/util/TypeUtil.java | 14 + .../eclipse/jetty/webapp/WebAppContext.java | 2 + jetty-websocket/pom.xml | 7 +- .../eclipse/jetty/websocket/WebSocket.java | 21 ++ .../jetty/websocket/WebSocketBuffers.java | 59 ++++ .../jetty/websocket/WebSocketConnection.java | 95 +++++- .../jetty/websocket/WebSocketGenerator.java | 286 +++++++++--------- .../jetty/websocket/WebSocketHandler.java | 101 +++++++ .../jetty/websocket/WebSocketParser.java | 27 +- .../jetty/websocket/WebSocketServlet.java | 83 +++++ .../websocket/WebSocketGeneratorTest.java | 24 +- .../jetty/websocket/WebSocketParserTest.java | 17 +- .../jetty/websocket/WebSocketTest.java | 141 +++++++++ 31 files changed, 979 insertions(+), 224 deletions(-) create mode 100644 jetty-io/src/main/java/org/eclipse/jetty/io/ConnectedEndPoint.java create mode 100644 jetty-io/src/main/java/org/eclipse/jetty/io/UpgradeConnectionException.java create mode 100644 jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocket.java create mode 100644 jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketBuffers.java create mode 100644 jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketHandler.java create mode 100644 jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketServlet.java create mode 100644 jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketTest.java diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java index caeebe50185..d495d7f0a0c 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java @@ -74,6 +74,11 @@ public class HttpConnection implements Connection _parser = new HttpParser(responseBuffers,endp,new Handler()); } + public long getTimeStamp() + { + return -1; + } + public void setReserved (boolean reserved) { _reserved = reserved; diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java b/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java index 71b4e475fdf..d4464112d4f 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/SelectConnector.java @@ -28,6 +28,7 @@ import org.eclipse.jetty.http.HttpVersions; import org.eclipse.jetty.http.ssl.SslSelectChannelEndPoint; import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.Buffers; +import org.eclipse.jetty.io.ConnectedEndPoint; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.ThreadLocalBuffers; import org.eclipse.jetty.io.nio.DirectNIOBuffer; @@ -210,6 +211,11 @@ class SelectConnector extends AbstractLifeCycle implements HttpClient.Connector, protected void endPointClosed(SelectChannelEndPoint endpoint) { } + + @Override + protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection) + { + } @Override protected Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint) diff --git a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java index a9929693f51..45b965f50ed 100644 --- a/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java +++ b/jetty-http/src/main/java/org/eclipse/jetty/http/HttpGenerator.java @@ -450,12 +450,15 @@ public class HttpGenerator extends AbstractGenerator if (_buffer!=null) _buffer.clear(); // end the header. - _header.put(HttpTokens.CRLF); - _state = STATE_CONTENT; - return; - } - if (_status==204 || _status==304) + if (_status!=101 ) + { + _header.put(HttpTokens.CRLF); + _state = STATE_CONTENT; + return; + } + } + else if (_status==204 || _status==304) { _noContent=true; _content=null; diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java index e17b849aad4..484e3660b51 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ByteArrayEndPoint.java @@ -22,7 +22,7 @@ import java.io.IOException; * * */ -public class ByteArrayEndPoint implements EndPoint +public class ByteArrayEndPoint implements ConnectedEndPoint { byte[] _inBytes; ByteArrayBuffer _in; @@ -30,6 +30,7 @@ public class ByteArrayEndPoint implements EndPoint boolean _closed; boolean _nonBlocking; boolean _growOutput; + Connection _connection; /* ------------------------------------------------------------ */ /** @@ -39,6 +40,24 @@ public class ByteArrayEndPoint implements EndPoint { } + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.io.ConnectedEndPoint#getConnection() + */ + public Connection getConnection() + { + return _connection; + } + + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.io.ConnectedEndPoint#setConnection(org.eclipse.jetty.io.Connection) + */ + public void setConnection(Connection connection) + { + _connection=connection; + } + /* ------------------------------------------------------------ */ /** * @return the nonBlocking diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/ConnectedEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/ConnectedEndPoint.java new file mode 100644 index 00000000000..a7e66dd2842 --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/ConnectedEndPoint.java @@ -0,0 +1,7 @@ +package org.eclipse.jetty.io; + +public interface ConnectedEndPoint extends EndPoint +{ + Connection getConnection(); + void setConnection(Connection connection); +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java index 1c8b2a38a02..fdb07d027c3 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java @@ -17,7 +17,9 @@ import java.io.IOException; public interface Connection { - void handle() throws IOException; + void handle() throws IOException, UpgradeConnectionException; + + long getTimeStamp(); boolean isIdle(); boolean isSuspended(); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/UpgradeConnectionException.java b/jetty-io/src/main/java/org/eclipse/jetty/io/UpgradeConnectionException.java new file mode 100644 index 00000000000..8b4c32e594f --- /dev/null +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/UpgradeConnectionException.java @@ -0,0 +1,26 @@ +package org.eclipse.jetty.io; + +/* ------------------------------------------------------------ */ +/** Upgrade Connection Exception + * This exception is thrown when processing a protocol upgrade + * to exit all the current connection handling and to + * allow the {@link ConnectedEndPoint} to handle the new exception. + * + * Code that calls {@link org.eclipse.jetty.io.Connection#handle()} + * should catch this exception and call {@link ConnectedEndPoint#setConnection(org.eclipse.jetty.io.Connection)} + * with the new connection and then immediately call handle() again. + */ +public class UpgradeConnectionException extends RuntimeException +{ + Connection _connection; + + public UpgradeConnectionException(Connection newConnection) + { + _connection=newConnection; + } + + public Connection getConnection() + { + return _connection; + } +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java index 3eeb61052c5..ab0a8b20ee2 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @@ -21,8 +21,10 @@ import java.nio.channels.SocketChannel; import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.io.ConnectedEndPoint; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.io.UpgradeConnectionException; import org.eclipse.jetty.io.nio.SelectorManager.SelectSet; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.thread.Timeout; @@ -34,7 +36,7 @@ import org.eclipse.jetty.util.thread.Timeout; * * */ -public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, AsyncEndPoint +public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, AsyncEndPoint, ConnectedEndPoint { private final SelectorManager.SelectSet _selectSet; private final SelectorManager _manager; @@ -66,12 +68,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, _key = key; scheduleIdle(); } - - /* ------------------------------------------------------------ */ - public Connection getConnection() - { - return _connection; - } + /* ------------------------------------------------------------ */ public SelectorManager getSelectManager() @@ -79,10 +76,18 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, return _manager; } + /* ------------------------------------------------------------ */ + public Connection getConnection() + { + return _connection; + } + /* ------------------------------------------------------------ */ public void setConnection(Connection connection) { + Connection old=_connection; _connection=connection; + _manager.endPointUpgraded(this,old); } /* ------------------------------------------------------------ */ @@ -217,7 +222,14 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException { int l = super.flush(header, buffer, trailer); - _writable = l!=0; + if (!(_writable=l!=0)) + { + synchronized (this) + { + if (!_dispatched) + updateKey(); + } + } return l; } @@ -228,7 +240,14 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, public int flush(Buffer buffer) throws IOException { int l = super.flush(buffer); - _writable = l!=0; + if (!(_writable=l!=0)) + { + synchronized (this) + { + if (!_dispatched) + updateKey(); + } + } return l; } @@ -445,6 +464,13 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements Runnable, { _connection.handle(); } + catch (UpgradeConnectionException e) + { + Log.debug(e.toString()); + Log.ignore(e); + setConnection(e.getConnection()); + continue; + } catch (ClosedChannelException e) { Log.ignore(e); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java index f8c771cef7c..70c90b503f6 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java @@ -23,6 +23,7 @@ import java.nio.channels.SocketChannel; import java.util.ArrayList; import java.util.List; +import org.eclipse.jetty.io.ConnectedEndPoint; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.util.component.AbstractLifeCycle; @@ -234,6 +235,9 @@ public abstract class SelectorManager extends AbstractLifeCycle */ protected abstract void endPointOpened(SelectChannelEndPoint endpoint); + /* ------------------------------------------------------------ */ + protected abstract void endPointUpgraded(ConnectedEndPoint endpoint,Connection oldConnection); + /* ------------------------------------------------------------------------------- */ protected abstract Connection newConnection(SocketChannel channel, SelectChannelEndPoint endpoint); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java index 2c3ebba3fe9..7da4dba9c30 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AbstractConnector.java @@ -26,6 +26,7 @@ import org.eclipse.jetty.http.HttpHeaders; import org.eclipse.jetty.http.HttpSchemes; import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.ByteArrayBuffer; +import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.util.component.LifeCycle; @@ -964,7 +965,7 @@ public abstract class AbstractConnector extends HttpBuffers implements Connector } /* ------------------------------------------------------------ */ - protected void connectionOpened(HttpConnection connection) + protected void connectionOpened(Connection connection) { if (_statsStartedAt==-1) return; @@ -977,13 +978,24 @@ public abstract class AbstractConnector extends HttpBuffers implements Connector } /* ------------------------------------------------------------ */ - protected void connectionClosed(HttpConnection connection) + protected void connectionUpgraded(Connection oldConnection,Connection newConnection) + { + int requests=(oldConnection instanceof HttpConnection)?((HttpConnection)oldConnection).getRequests():0; + + synchronized(_statsLock) + { + _requests+=requests; + } + } + + /* ------------------------------------------------------------ */ + protected void connectionClosed(Connection connection) { if (_statsStartedAt>=0) { long duration=System.currentTimeMillis()-connection.getTimeStamp(); - int requests=connection.getRequests(); - + + int requests=(connection instanceof HttpConnection)?((HttpConnection)connection).getRequests():0; synchronized(_statsLock) { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 7f096477b47..5bde2ff1f40 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -45,6 +45,7 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.io.UncheckedPrintWriter; +import org.eclipse.jetty.io.UpgradeConnectionException; import org.eclipse.jetty.io.BufferCache.CachedBuffer; import org.eclipse.jetty.io.nio.SelectChannelEndPoint; import org.eclipse.jetty.util.QuotedStringTokenizer; @@ -563,6 +564,10 @@ public class HttpConnection implements Connection server.handleAsync(this); } } + catch (UpgradeConnectionException e) + { + throw e; + } catch (ContinuationThrowable e) { Log.ignore(e); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java index 5012fdc9297..0836a9ae39c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/LocalConnector.java @@ -20,7 +20,10 @@ import java.util.concurrent.LinkedBlockingQueue; import org.eclipse.jetty.io.ByteArrayBuffer; import org.eclipse.jetty.io.ByteArrayEndPoint; +import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.UpgradeConnectionException; import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.log.Log; public class LocalConnector extends AbstractConnector { @@ -101,17 +104,41 @@ public class LocalConnector extends AbstractConnector public void run() { - ByteArrayEndPoint endPoint = new ByteArrayEndPoint(requestsBuffer.asArray(), 1024); + ByteArrayEndPoint endPoint = new ByteArrayEndPoint(requestsBuffer.asArray(), 1024) + { + @Override + public void setConnection(Connection connection) + { + connectionUpgraded(getConnection(),connection); + super.setConnection(connection); + } + }; + endPoint.setGrowOutput(true); - HttpConnection connection = new HttpConnection(LocalConnector.this, endPoint, getServer()); + endPoint.setConnection(connection); connectionOpened(connection); - + boolean leaveOpen = keepOpen; try { while (endPoint.getIn().length() > 0) - connection.handle(); + { + while(true) + { + try + { + endPoint.getConnection().handle(); + break; + } + catch (UpgradeConnectionException e) + { + Log.debug(e.toString()); + Log.ignore(e); + endPoint.setConnection(e.getConnection()); + } + } + } } catch (Exception x) { diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/bio/SocketConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/bio/SocketConnector.java index de703860ddc..311a7a646cd 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/bio/SocketConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/bio/SocketConnector.java @@ -23,8 +23,11 @@ import java.util.Set; import org.eclipse.jetty.http.HttpException; import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.io.ConnectedEndPoint; +import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.io.UpgradeConnectionException; import org.eclipse.jetty.io.bio.SocketEndPoint; import org.eclipse.jetty.server.AbstractConnector; import org.eclipse.jetty.server.HttpConnection; @@ -47,7 +50,7 @@ import org.eclipse.jetty.util.log.Log; public class SocketConnector extends AbstractConnector { protected ServerSocket _serverSocket; - protected final Set _connections; + protected final Set _connections; /* ------------------------------------------------------------ */ /** Constructor. @@ -55,7 +58,7 @@ public class SocketConnector extends AbstractConnector */ public SocketConnector() { - _connections=new HashSet(); + _connections=new HashSet(); } /* ------------------------------------------------------------ */ @@ -99,7 +102,7 @@ public class SocketConnector extends AbstractConnector Socket socket = _serverSocket.accept(); configure(socket); - Connection connection=new Connection(socket); + ConnectorEndPoint connection=new ConnectorEndPoint(socket); connection.dispatch(); } @@ -117,7 +120,7 @@ public class SocketConnector extends AbstractConnector public void customize(EndPoint endpoint, Request request) throws IOException { - Connection connection = (Connection)endpoint; + ConnectorEndPoint connection = (ConnectorEndPoint)endpoint; int lrmit = isLowResources()?_lowResourceMaxIdleTime:_maxIdleTime; if (connection._sotimeout!=lrmit) { @@ -159,7 +162,7 @@ public class SocketConnector extends AbstractConnector Iterator iter=set.iterator(); while(iter.hasNext()) { - Connection connection = (Connection)iter.next(); + ConnectorEndPoint connection = (ConnectorEndPoint)iter.next(); connection.close(); } } @@ -167,20 +170,32 @@ public class SocketConnector extends AbstractConnector /* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */ - protected class Connection extends SocketEndPoint implements Runnable + protected class ConnectorEndPoint extends SocketEndPoint implements Runnable, ConnectedEndPoint { boolean _dispatched=false; - HttpConnection _connection; + volatile Connection _connection; int _sotimeout; - protected Socket _socket; + protected final Socket _socket; - public Connection(Socket socket) throws IOException + public ConnectorEndPoint(Socket socket) throws IOException { super(socket); _connection = newHttpConnection(this); _sotimeout=socket.getSoTimeout(); _socket=socket; } + + public Connection getConnection() + { + return _connection; + } + + public void setConnection(Connection connection) + { + if (_connection!=connection) + connectionUpgraded(_connection,connection); + _connection=connection; + } public void dispatch() throws IOException { @@ -203,7 +218,8 @@ public class SocketConnector extends AbstractConnector @Override public void close() throws IOException { - _connection.getRequest().getAsyncContinuation().cancel(); + if (_connection instanceof HttpConnection) + ((HttpConnection)_connection).getRequest().getAsyncContinuation().cancel(); super.close(); } @@ -231,7 +247,17 @@ public class SocketConnector extends AbstractConnector } } } - _connection.handle(); + try + { + _connection.handle(); + } + catch (UpgradeConnectionException e) + { + Log.debug(e.toString()); + Log.ignore(e); + setConnection(e.getConnection()); + continue; + } } } catch (EofException e) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/BlockingChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/BlockingChannelConnector.java index 4b94cc78db0..2039c8b94a7 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/BlockingChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/BlockingChannelConnector.java @@ -21,8 +21,11 @@ import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; import org.eclipse.jetty.http.HttpException; +import org.eclipse.jetty.io.ConnectedEndPoint; +import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.EofException; +import org.eclipse.jetty.io.UpgradeConnectionException; import org.eclipse.jetty.io.nio.ChannelEndPoint; import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.Request; @@ -89,7 +92,7 @@ public class BlockingChannelConnector extends AbstractNIOConnector Socket socket=channel.socket(); configure(socket); - Connection connection=new Connection(channel); + ConnectorEndPoint connection=new ConnectorEndPoint(channel); connection.dispatch(); } @@ -98,7 +101,7 @@ public class BlockingChannelConnector extends AbstractNIOConnector public void customize(EndPoint endpoint, Request request) throws IOException { - Connection connection = (Connection)endpoint; + ConnectorEndPoint connection = (ConnectorEndPoint)endpoint; if (connection._sotimeout!=_maxIdleTime) { connection._sotimeout=_maxIdleTime; @@ -121,24 +124,41 @@ public class BlockingChannelConnector extends AbstractNIOConnector /* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */ /* ------------------------------------------------------------------------------- */ - private class Connection extends ChannelEndPoint implements Runnable + private class ConnectorEndPoint extends ChannelEndPoint implements Runnable, ConnectedEndPoint { - final HttpConnection _connection; + Connection _connection; boolean _dispatched=false; int _sotimeout; - Connection(ByteChannel channel) + ConnectorEndPoint(ByteChannel channel) { super(channel); _connection = new HttpConnection(BlockingChannelConnector.this,this,getServer()); } + + /* ------------------------------------------------------------ */ + /** Get the connection. + * @return the connection + */ + public Connection getConnection() + { + return _connection; + } + + /* ------------------------------------------------------------ */ + public void setConnection(Connection connection) + { + _connection=connection; + } + + /* ------------------------------------------------------------ */ void dispatch() throws IOException { if (!getThreadPool().dispatch(this)) { Log.warn("dispatch failed for {}",_connection); - Connection.this.close(); + ConnectorEndPoint.this.close(); } } @@ -162,25 +182,35 @@ public class BlockingChannelConnector extends AbstractNIOConnector } } } - _connection.handle(); + try + { + _connection.handle(); + } + catch (UpgradeConnectionException e) + { + Log.debug(e.toString()); + Log.ignore(e); + setConnection(e.getConnection()); + continue; + } } } catch (EofException e) { Log.debug("EOF", e); - try{Connection.this.close();} + try{ConnectorEndPoint.this.close();} catch(IOException e2){Log.ignore(e2);} } catch (HttpException e) { Log.debug("BAD", e); - try{Connection.this.close();} + try{ConnectorEndPoint.this.close();} catch(IOException e2){Log.ignore(e2);} } catch(Throwable e) { Log.warn("handle failed",e); - try{Connection.this.close();} + try{ConnectorEndPoint.this.close();} catch(IOException e2){Log.ignore(e2);} } finally diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java index daeabe107b1..1f1e0dcd19a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java @@ -20,6 +20,7 @@ import java.nio.channels.SelectionKey; import java.nio.channels.ServerSocketChannel; import java.nio.channels.SocketChannel; +import org.eclipse.jetty.io.ConnectedEndPoint; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.nio.SelectChannelEndPoint; @@ -89,14 +90,20 @@ public class SelectChannelConnector extends AbstractNIOConnector @Override protected void endPointClosed(final SelectChannelEndPoint endpoint) { - connectionClosed((HttpConnection)endpoint.getConnection()); + connectionClosed(endpoint.getConnection()); } @Override protected void endPointOpened(SelectChannelEndPoint endpoint) { // TODO handle max connections and low resources - connectionOpened((HttpConnection)endpoint.getConnection()); + connectionOpened(endpoint.getConnection()); + } + + @Override + protected void endPointUpgraded(ConnectedEndPoint endpoint, Connection oldConnection) + { + connectionUpgraded(oldConnection,endpoint.getConnection()); } @Override diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/ssl/SslSocketConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/ssl/SslSocketConnector.java index 37a1113f207..c2379df5d85 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/ssl/SslSocketConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/ssl/SslSocketConnector.java @@ -164,7 +164,7 @@ public class SslSocketConnector extends SocketConnector implements SslConnector Socket socket = _serverSocket.accept(); configure(socket); - Connection connection=new SslConnection(socket); + ConnectorEndPoint connection=new SslConnection(socket); connection.dispatch(); } @@ -670,7 +670,7 @@ public class SslSocketConnector extends SocketConnector implements SslConnector /* ------------------------------------------------------------ */ - public class SslConnection extends Connection + public class SslConnection extends ConnectorEndPoint { public SslConnection(Socket socket) throws IOException { diff --git a/jetty-servlet/src/main/java/org/eclipse/jetty/servlet/ServletHandler.java b/jetty-servlet/src/main/java/org/eclipse/jetty/servlet/ServletHandler.java index a16cba5dc30..50669422784 100644 --- a/jetty-servlet/src/main/java/org/eclipse/jetty/servlet/ServletHandler.java +++ b/jetty-servlet/src/main/java/org/eclipse/jetty/servlet/ServletHandler.java @@ -39,6 +39,7 @@ import org.eclipse.jetty.http.HttpException; import org.eclipse.jetty.http.PathMap; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.RuntimeIOException; +import org.eclipse.jetty.io.UpgradeConnectionException; import org.eclipse.jetty.security.IdentityService; import org.eclipse.jetty.security.SecurityHandler; import org.eclipse.jetty.server.Dispatcher; @@ -438,6 +439,10 @@ public class ServletHandler extends ScopedHandler { throw e; } + catch(UpgradeConnectionException e) + { + throw e; + } catch(Exception e) { if (!(DispatcherType.REQUEST.equals(type) || DispatcherType.ASYNC.equals(type))) diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/StringUtil.java b/jetty-util/src/main/java/org/eclipse/jetty/util/StringUtil.java index e596fb45d06..9fa6d46aa40 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/StringUtil.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/StringUtil.java @@ -337,6 +337,24 @@ public class StringUtil return buf.toString(); } + /* ------------------------------------------------------------ */ + public static String printable(byte[] b) + { + StringBuilder buf = new StringBuilder(); + for (int i=0;i' ' && c<0x7f) + buf.append(c); + else + { + buf.append("0x"); + TypeUtil.toHex(b[i],buf); + } + } + return buf.toString(); + } + public static byte[] getBytes(String s) { try diff --git a/jetty-util/src/main/java/org/eclipse/jetty/util/TypeUtil.java b/jetty-util/src/main/java/org/eclipse/jetty/util/TypeUtil.java index ccc265dc482..5c508eb3684 100644 --- a/jetty-util/src/main/java/org/eclipse/jetty/util/TypeUtil.java +++ b/jetty-util/src/main/java/org/eclipse/jetty/util/TypeUtil.java @@ -447,6 +447,20 @@ public class TypeUtil return 0; } + /* ------------------------------------------------------------ */ + public static void toHex(byte b,StringBuilder buf) + { + int bi=0xff&b; + int c='0'+(bi/16)%16; + if (c>'9') + c= 'A'+(c-'0'-10); + buf.append((char)c); + c='0'+bi%16; + if (c>'9') + c= 'A'+(c-'0'-10); + buf.append((char)c); + } + /* ------------------------------------------------------------ */ public static String toHexString(byte[] b) { diff --git a/jetty-webapp/src/main/java/org/eclipse/jetty/webapp/WebAppContext.java b/jetty-webapp/src/main/java/org/eclipse/jetty/webapp/WebAppContext.java index 67332908159..d6c57470eef 100644 --- a/jetty-webapp/src/main/java/org/eclipse/jetty/webapp/WebAppContext.java +++ b/jetty-webapp/src/main/java/org/eclipse/jetty/webapp/WebAppContext.java @@ -95,12 +95,14 @@ public class WebAppContext extends ServletContextHandler "org.eclipse.jetty.jndi.", // webapp cannot change naming classes "org.eclipse.jetty.plus.jaas.", // webapp cannot change jetty jaas classes "org.eclipse.jetty.servlet.DefaultServlet", // webapp cannot change default servlets + "org.eclipse.jetty.websocket.", // WebSocket is a jetty extension }; private String[] _serverClasses = { "-org.eclipse.jetty.continuation.", // don't hide continuation classes "-org.eclipse.jetty.jndi.", // don't hide naming classes "-org.eclipse.jetty.plus.jaas.", // don't hide jaas modules "-org.eclipse.jetty.servlet.DefaultServlet", // webapp cannot change default servlets + "-org.eclipse.jetty.websocket.", // don't hide websocket extension "org.eclipse.jetty." // hide rest of jetty classes }; private File _tmpDir; diff --git a/jetty-websocket/pom.xml b/jetty-websocket/pom.xml index c84a9c5516e..c1e15e7e4cb 100644 --- a/jetty-websocket/pom.xml +++ b/jetty-websocket/pom.xml @@ -11,7 +11,7 @@ org.eclipse.jetty - jetty-io + jetty-server ${project.version} @@ -19,6 +19,11 @@ junit test + + javax.servlet + servlet-api + jar + diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocket.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocket.java new file mode 100644 index 00000000000..c9c4ac9c688 --- /dev/null +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocket.java @@ -0,0 +1,21 @@ +package org.eclipse.jetty.websocket; + +import java.io.IOException; + +public interface WebSocket +{ + public final byte LENGTH_FRAME=(byte)0x80; + public final byte SENTINEL_FRAME=(byte)0x00; + void onConnect(Outbound outbound); + void onMessage(byte frame,String data); + void onMessage(byte frame,byte[] data, int offset, int length); + void onDisconnect(); + + public interface Outbound + { + void sendMessage(byte frame,String data) throws IOException; + void sendMessage(byte frame,byte[] data) throws IOException; + void sendMessage(byte frame,byte[] data, int offset, int length) throws IOException; + void disconnect() throws IOException; + } +} diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketBuffers.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketBuffers.java new file mode 100644 index 00000000000..729ec83bb39 --- /dev/null +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketBuffers.java @@ -0,0 +1,59 @@ +package org.eclipse.jetty.websocket; + +import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.io.ByteArrayBuffer; +import org.eclipse.jetty.io.ThreadLocalBuffers; +import org.eclipse.jetty.io.nio.DirectNIOBuffer; + + +/* ------------------------------------------------------------ */ +/** The WebSocket Buffer Pool. + * + * The normal buffers are byte array buffers so that user processes + * can access directly. However the generator uses direct buffers + * for the final output stage as they are filled in bulk and are more + * effecient to flush. + */ +public class WebSocketBuffers +{ + final private ThreadLocalBuffers _buffers; + + public WebSocketBuffers(final int bufferSize) + { + _buffers = new ThreadLocalBuffers() + { + @Override + protected Buffer newHeader(int size) + { + return new DirectNIOBuffer(bufferSize); + } + + @Override + protected Buffer newBuffer(int size) + { + return new ByteArrayBuffer(bufferSize); + } + + @Override + protected boolean isHeader(Buffer buffer) + { + return buffer instanceof DirectNIOBuffer; + } + }; + } + + public Buffer getBuffer() + { + return _buffers.getBuffer(); + } + + public Buffer getDirectBuffer() + { + return _buffers.getHeader(); + } + + public void returnBuffer(Buffer buffer) + { + _buffers.returnBuffer(buffer); + } +} diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnection.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnection.java index dcdb9f10cae..f19115533e0 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnection.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnection.java @@ -2,13 +2,64 @@ package org.eclipse.jetty.websocket; import java.io.IOException; +import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.Connection; +import org.eclipse.jetty.io.EndPoint; +import org.eclipse.jetty.util.log.Log; -public class WebSocketConnection implements Connection +public class WebSocketConnection implements Connection, WebSocket.Outbound { - WebSocketParser _parser; - WebSocketGenerator _generator; - + final EndPoint _endp; + final WebSocketParser _parser; + final WebSocketGenerator _generator; + final long _timestamp; + final WebSocket _websocket; + final int _maxIdleTimeMs=30000; + + public WebSocketConnection(WebSocketBuffers buffers, EndPoint endpoint, long timestamp, WebSocket websocket) + { + _endp = endpoint; + _timestamp = timestamp; + _websocket = websocket; + _generator = new WebSocketGenerator(buffers, _endp); + _parser = new WebSocketParser(buffers, endpoint, new WebSocketParser.EventHandler() + { + public void onFrame(byte frame, String data) + { + try + { + _websocket.onMessage(frame,data); + } + catch(ThreadDeath th) + { + throw th; + } + catch(Throwable th) + { + Log.warn(th); + } + } + + public void onFrame(byte frame, Buffer buffer) + { + try + { + byte[] array=buffer.array(); + + _websocket.onMessage(frame,array,buffer.getIndex(),buffer.length()); + } + catch(ThreadDeath th) + { + throw th; + } + catch(Throwable th) + { + Log.warn(th); + } + } + }); + } + public void handle() throws IOException { boolean more=true; @@ -18,7 +69,7 @@ public class WebSocketConnection implements Connection int flushed=_generator.flush(); int filled=_parser.parseNext(); - more = flushed>0 || filled>0 || !_parser.isBufferEmpty(); + more = flushed>0 || filled>0 || !_parser.isBufferEmpty() || !_generator.isBufferEmpty(); } } @@ -32,4 +83,38 @@ public class WebSocketConnection implements Connection return false; } + public long getTimeStamp() + { + return _timestamp; + } + + public void sendMessage(byte frame, String content) throws IOException + { + _generator.addFrame(frame,content,_maxIdleTimeMs); + _generator.flush(); + } + + public void sendMessage(byte frame, byte[] content) throws IOException + { + _generator.addFrame(frame,content,_maxIdleTimeMs); + _generator.flush(); + } + + public void sendMessage(byte frame, byte[] content, int offset, int length) throws IOException + { + _generator.addFrame(frame,content,offset,length,_maxIdleTimeMs); + _generator.flush(); + } + + public void disconnect() throws IOException + { + _generator.flush(_maxIdleTimeMs); + _endp.close(); + } + + public void fill(Buffer buffer) + { + _parser.fill(buffer); + } + } diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGenerator.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGenerator.java index d4d89ca0550..b94ecaa19e2 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGenerator.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGenerator.java @@ -3,7 +3,6 @@ package org.eclipse.jetty.websocket; import java.io.IOException; import org.eclipse.jetty.io.Buffer; -import org.eclipse.jetty.io.Buffers; import org.eclipse.jetty.io.EndPoint; @@ -16,157 +15,173 @@ import org.eclipse.jetty.io.EndPoint; */ public class WebSocketGenerator { - final private Buffers _buffers; + final private WebSocketBuffers _buffers; final private EndPoint _endp; private Buffer _buffer; - public WebSocketGenerator(Buffers buffers, EndPoint endp) + public WebSocketGenerator(WebSocketBuffers buffers, EndPoint endp) { _buffers=buffers; _endp=endp; } - synchronized public boolean addMessage(byte frame,Buffer content, long blockFor) throws IOException + synchronized public void addFrame(byte frame,byte[] content, int blockFor) throws IOException { - if (_buffer==null) - _buffer=_buffers.getBuffer(); - else if (_buffer.length()>0) - flushBuffer(); - - int length=content.length(); - if (length>2097152) - throw new IllegalArgumentException("too big"); - - int length_bytes=(length>16384)?3:(length>128)?2:1; - - if (_buffer.space()>14))); - case 2: - _buffer.put((byte)(0x80|(0x7f&(length>>7)))); - case 1: - _buffer.put((byte)(0x7f&length)); - } - - _buffer.put(content); - return true; - } - - synchronized public boolean addMessage(byte frame, String content, long blockFor) throws IOException - { - if (_buffer==null) - _buffer=_buffers.getBuffer(); - else if (_buffer.length()>0) - flushBuffer(); - - int length=content.length(); - int space=waitForSpace(length+2,blockFor); - - _buffer.put((byte)(0x7f&frame)); - - for (int i = 0; i < length; i++) - { - int code = content.charAt(i); - - if ((code & 0xffffff80) == 0) - { - // 1b - if (space<1) - space=waitForSpace(1,blockFor); - _buffer.put((byte)(code)); - space--; - } - else if((code&0xfffff800)==0) - { - // 2b - if (space<2) - space=waitForSpace(2,blockFor); - _buffer.put((byte)(0xc0|(code>>6))); - _buffer.put((byte)(0x80|(code&0x3f))); - space-=2; - } - else if((code&0xffff0000)==0) - { - // 3b - if (space<3) - space=waitForSpace(3,blockFor); - _buffer.put((byte)(0xe0|(code>>12))); - _buffer.put((byte)(0x80|((code>>6)&0x3f))); - _buffer.put((byte)(0x80|(code&0x3f))); - space-=3; - } - else if((code&0xff200000)==0) - { - // 4b - if (space<4) - space=waitForSpace(4,blockFor); - _buffer.put((byte)(0xf0|(code>>18))); - _buffer.put((byte)(0x80|((code>>12)&0x3f))); - _buffer.put((byte)(0x80|((code>>6)&0x3f))); - _buffer.put((byte)(0x80|(code&0x3f))); - space-=4; - } - else if((code&0xf4000000)==0) - { - // 5b - if (space<5) - space=waitForSpace(5,blockFor); - _buffer.put((byte)(0xf8|(code>>24))); - _buffer.put((byte)(0x80|((code>>18)&0x3f))); - _buffer.put((byte)(0x80|((code>>12)&0x3f))); - _buffer.put((byte)(0x80|((code>>6)&0x3f))); - _buffer.put((byte)(0x80|(code&0x3f))); - space-=5; - } - else if((code&0x80000000)==0) - { - // 6b - if (space<6) - space=waitForSpace(6,blockFor); - _buffer.put((byte)(0xfc|(code>>30))); - _buffer.put((byte)(0x80|((code>>24)&0x3f))); - _buffer.put((byte)(0x80|((code>>18)&0x3f))); - _buffer.put((byte)(0x80|((code>>12)&0x3f))); - _buffer.put((byte)(0x80|((code>>6)&0x3f))); - _buffer.put((byte)(0x80|(code&0x3f))); - space-=6; - } - else - { - _buffer.put((byte)('?')); - space-=1; - } - } - - if (space<1) - space=waitForSpace(1,blockFor); - _buffer.put((byte)(0xff)); - - return true; + addFrame(frame,content,0,content.length,blockFor); } - private int waitForSpace(int needed, long blockFor) + synchronized public void addFrame(byte frame,byte[] content, int offset, int length, int blockFor) throws IOException + { + if (_buffer==null) + _buffer=_buffers.getDirectBuffer(); + + if ((frame&0x80)==0x80) + { + // Send in a length delimited frame + + // maximum of 3 byte length == 21 bits + if (length>2097152) + throw new IllegalArgumentException("too big"); + int length_bytes=(length>16384)?3:(length>128)?2:1; + int needed=length+1+length_bytes; + checkSpace(needed,blockFor); + + _buffer.put(frame); + + switch (length_bytes) + { + case 3: + _buffer.put((byte)(0x80|(length>>14))); + case 2: + _buffer.put((byte)(0x80|(0x7f&(length>>7)))); + case 1: + _buffer.put((byte)(0x7f&length)); + } + + _buffer.put(content,offset,length); + } + else + { + // send in a sentinel frame + int needed=length+2; + checkSpace(needed,blockFor); + + _buffer.put(frame); + _buffer.put(content,offset,length); + _buffer.put((byte)0xFF); + } + } + + synchronized public void addFrame(byte frame, String content, int blockFor) throws IOException + { + Buffer byte_buffer=_buffers.getBuffer(); + try + { + byte[] array=byte_buffer.array(); + + int chars = content.length(); + int bytes = 0; + final int limit=array.length-6; + + for (int i = 0; i < chars; i++) + { + int code = content.charAt(i); + + if (bytes>=limit) + throw new IllegalArgumentException("frame too large"); + + if ((code & 0xffffff80) == 0) + { + array[bytes++]=(byte)(code); + } + else if((code&0xfffff800)==0) + { + array[bytes++]=(byte)(0xc0|(code>>6)); + array[bytes++]=(byte)(0x80|(code&0x3f)); + } + else if((code&0xffff0000)==0) + { + array[bytes++]=(byte)(0xe0|(code>>12)); + array[bytes++]=(byte)(0x80|((code>>6)&0x3f)); + array[bytes++]=(byte)(0x80|(code&0x3f)); + } + else if((code&0xff200000)==0) + { + array[bytes++]=(byte)(0xf0|(code>>18)); + array[bytes++]=(byte)(0x80|((code>>12)&0x3f)); + array[bytes++]=(byte)(0x80|((code>>6)&0x3f)); + array[bytes++]=(byte)(0x80|(code&0x3f)); + } + else if((code&0xf4000000)==0) + { + array[bytes++]=(byte)(0xf8|(code>>24)); + array[bytes++]=(byte)(0x80|((code>>18)&0x3f)); + array[bytes++]=(byte)(0x80|((code>>12)&0x3f)); + array[bytes++]=(byte)(0x80|((code>>6)&0x3f)); + array[bytes++]=(byte)(0x80|(code&0x3f)); + } + else if((code&0x80000000)==0) + { + array[bytes++]=(byte)(0xfc|(code>>30)); + array[bytes++]=(byte)(0x80|((code>>24)&0x3f)); + array[bytes++]=(byte)(0x80|((code>>18)&0x3f)); + array[bytes++]=(byte)(0x80|((code>>12)&0x3f)); + array[bytes++]=(byte)(0x80|((code>>6)&0x3f)); + array[bytes++]=(byte)(0x80|(code&0x3f)); + } + else + { + array[bytes++]=(byte)('?'); + } + } + addFrame(frame,array,0,bytes,blockFor); + } + finally + { + _buffers.returnBuffer(byte_buffer); + } + } + + private void checkSpace(int needed, long blockFor) + throws IOException { int space=_buffer.space(); if (space0 && _endp.blockWritable(blockFor)) + { + flushBuffer(); + _buffer.compact(); + space=_buffer.space(); + } + } + if (space0) + { + if (_buffer==null) + _buffer=_buffers.getBuffer(); + _buffer.put(buffer); + buffer.clear(); + } + + + } + /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ public interface EventHandler { - void onUtf8Message(byte frame,String data); - void onBinaryMessage(byte frame,Buffer buffer); + void onFrame(byte frame,String data); + void onFrame(byte frame,Buffer buffer); } + } diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketServlet.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketServlet.java new file mode 100644 index 00000000000..dafbcd54006 --- /dev/null +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketServlet.java @@ -0,0 +1,83 @@ +package org.eclipse.jetty.websocket; + +import java.io.IOException; + +import javax.servlet.ServletException; +import javax.servlet.http.HttpServlet; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.http.HttpParser; +import org.eclipse.jetty.io.ConnectedEndPoint; +import org.eclipse.jetty.io.UpgradeConnectionException; +import org.eclipse.jetty.server.HttpConnection; + + +/* ------------------------------------------------------------ */ +/** + * Servlet to ugrade connections to WebSocket + *

+ * The request must have the correct upgrade headers, else it is + * handled as a normal servlet request. + *

+ * The initParameter "bufferSize" can be used to set the buffer size, + * which is also the max frame byte size (default 8192). + */ +public abstract class WebSocketServlet extends HttpServlet +{ + WebSocketBuffers _buffers; + + + /* ------------------------------------------------------------ */ + /** + * @see javax.servlet.GenericServlet#init() + */ + @Override + public void init() throws ServletException + { + String bs=getInitParameter("bufferSize"); + _buffers = new WebSocketBuffers(bs==null?8192:Integer.parseInt(bs)); + } + + /* ------------------------------------------------------------ */ + /** + * @see javax.servlet.http.HttpServlet#service(javax.servlet.http.HttpServletRequest, javax.servlet.http.HttpServletResponse) + */ + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + if ("WebSocket".equals(request.getHeader("Upgrade")) && + "HTTP/1.1".equals(request.getProtocol())) + { + WebSocket websocket=doWebSocketConnect(request,request.getHeader("WebSocket-Protocol")); + + if (websocket!=null) + { + HttpConnection http = HttpConnection.getCurrentConnection(); + ConnectedEndPoint endp = (ConnectedEndPoint)http.getEndPoint(); + WebSocketConnection connection = new WebSocketConnection(_buffers,endp,http.getTimeStamp(),websocket); + + response.setHeader("Upgrade","WebSocket"); + response.addHeader("Connection","Upgrade"); + response.sendError(101,"Web Socket Protocol Handshake"); + response.flushBuffer(); + + connection.fill(((HttpParser)http.getParser()).getHeaderBuffer()); + connection.fill(((HttpParser)http.getParser()).getBodyBuffer()); + + websocket.onConnect(connection); + throw new UpgradeConnectionException(connection); + } + else + { + response.sendError(503); + } + } + else + super.service(request,response); + } + + abstract protected WebSocket doWebSocketConnect(HttpServletRequest request,String protocol); + + +} diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketGeneratorTest.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketGeneratorTest.java index e88561dc267..1ebb41aa6a0 100644 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketGeneratorTest.java +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketGeneratorTest.java @@ -1,17 +1,11 @@ package org.eclipse.jetty.websocket; -import java.util.ArrayList; -import java.util.List; +import junit.framework.TestCase; -import org.eclipse.jetty.io.Buffer; -import org.eclipse.jetty.io.Buffers; import org.eclipse.jetty.io.ByteArrayBuffer; import org.eclipse.jetty.io.ByteArrayEndPoint; -import org.eclipse.jetty.io.SimpleBuffers; import org.eclipse.jetty.util.StringUtil; -import junit.framework.TestCase; - /* ------------------------------------------------------------ */ /** @@ -19,7 +13,7 @@ import junit.framework.TestCase; public class WebSocketGeneratorTest extends TestCase { - Buffers _buffers; + WebSocketBuffers _buffers; ByteArrayBuffer _out; ByteArrayEndPoint _endp; WebSocketGenerator _generator; @@ -28,17 +22,17 @@ public class WebSocketGeneratorTest extends TestCase @Override protected void setUp() throws Exception { - _buffers=new SimpleBuffers(null,new ByteArrayBuffer(1024)); + _buffers=new WebSocketBuffers(1024); _endp = new ByteArrayEndPoint(); _generator = new WebSocketGenerator(_buffers,_endp); _out = new ByteArrayBuffer(2048); _endp.setOut(_out); } - + /* ------------------------------------------------------------ */ public void testOneString() throws Exception { - _generator.addMessage((byte)0x04,"Hell\uFF4F W\uFF4Frld",0); + _generator.addFrame((byte)0x04,"Hell\uFF4F W\uFF4Frld",0); _generator.flush(); assertEquals(4,_out.get()); assertEquals('H',_out.get()); @@ -61,7 +55,7 @@ public class WebSocketGeneratorTest extends TestCase public void testOneBuffer() throws Exception { - _generator.addMessage((byte)0x04,new ByteArrayBuffer("Hell\uFF4F W\uFF4Frld",StringUtil.__UTF8),0); + _generator.addFrame((byte)0x84,"Hell\uFF4F W\uFF4Frld".getBytes(StringUtil.__UTF8),0); _generator.flush(); assertEquals(0x84,0xff&_out.get()); assertEquals(15,0xff&_out.get()); @@ -88,8 +82,7 @@ public class WebSocketGeneratorTest extends TestCase for (int i=0;i _data = new ArrayList(); - public void onBinaryMessage(byte frame, Buffer buffer) + public void onFrame(byte frame, Buffer buffer) { _data.add(buffer.toString(StringUtil.__UTF8)); } - public void onUtf8Message(byte frame, String data) + public void onFrame(byte frame, String data) { _data.add(data); } diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketTest.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketTest.java new file mode 100644 index 00000000000..7fa6bf850cd --- /dev/null +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketTest.java @@ -0,0 +1,141 @@ +package org.eclipse.jetty.websocket; + +import java.io.IOException; + +import javax.servlet.http.HttpServletRequest; + +import junit.framework.TestCase; + +import org.eclipse.jetty.io.Buffer; +import org.eclipse.jetty.io.ByteArrayBuffer; +import org.eclipse.jetty.server.LocalConnector; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.handler.DefaultHandler; +import org.eclipse.jetty.util.StringUtil; +import org.eclipse.jetty.util.log.Log; + +public class WebSocketTest extends TestCase +{ + TestWebSocket _websocket; + LocalConnector _connector; + Server _server; + WebSocketHandler _handler; + + + + /* ------------------------------------------------------------ */ + @Override + protected void setUp() throws Exception + { + _server = new Server(); + _connector = new LocalConnector(); + _server.addConnector(_connector); + _handler= new WebSocketHandler() + { + @Override + protected WebSocket doWebSocketConnect(HttpServletRequest request, String protocol) + { + _websocket = new TestWebSocket(); + return _websocket; + } + }; + _handler.setHandler(new DefaultHandler()); + _server.setHandler(_handler); + + _server.start(); + } + + + public void testNoWebSocket() throws Exception + { + String response = _connector.getResponses( + "GET /foo HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "\r\n",false); + + assertTrue(response.startsWith("HTTP/1.1 404 ")); + } + + /* ------------------------------------------------------------ */ + public void testOpenWebSocket() throws Exception + { + String response = _connector.getResponses( + "GET /demo HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "Upgrade: WebSocket\r\n" + + "Connection: Upgrade\r\n" + + "\r\n",false); + + assertTrue(response.startsWith("HTTP/1.1 101 Web Socket Protocol Handshake")); + assertTrue(response.contains("Upgrade: WebSocket")); + assertTrue(response.contains("Connection: Upgrade")); + } + + /* ------------------------------------------------------------ */ + public void testSendReceiveUtf8WebSocket() throws Exception + { + ByteArrayBuffer buffer = new ByteArrayBuffer(1024); + + buffer.put( + ("GET /demo HTTP/1.1\r\n" + + "Host: localhost\r\n" + + "Upgrade: WebSocket\r\n" + + "Connection: Upgrade\r\n" + + "\r\n").getBytes(StringUtil.__ISO_8859_1)); + + buffer.put((byte)0); + buffer.put("Hello World".getBytes(StringUtil.__UTF8)); + buffer.put((byte)0xFF); + + ByteArrayBuffer out = _connector.getResponses(buffer,true); + + String response = StringUtil.printable(out.asArray()); + + assertTrue(response.startsWith("HTTP/1.1 101 Web Socket Protocol Handshake")); + assertTrue(response.contains("Upgrade: WebSocket")); + assertTrue(response.contains("Connection: Upgrade")); + assertTrue(response.contains("0x00Roger That0xFF")); + + assertEquals("Hello World",_websocket._utf8); + } + + + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + class TestWebSocket implements WebSocket + { + Outbound _outbound; + Buffer _binary; + String _utf8; + boolean _disconnected; + + public void onConnect(Outbound outbound) + { + _outbound=outbound; + try + { + _outbound.sendMessage(SENTINEL_FRAME,"Roger That"); + } + catch (IOException e) + { + Log.warn(e); + } + } + + public void onMessage(byte frame, byte[] data,int offset, int length) + { + _binary=new ByteArrayBuffer(data,offset,length).duplicate(Buffer.READONLY); + } + + public void onMessage(byte frame, String data) + { + _utf8=data; + } + + public void onDisconnect() + { + _disconnected=true; + } + } + +}