From 58652d8f09819df0e01685b1e98f8985d39c0af5 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Wed, 13 Apr 2011 11:05:35 +0000 Subject: [PATCH] 342700 refine websocket API for anticipated changes git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@3013 7e9141cc-0065-0410-87d8-b60c137991c4 --- VERSION.txt | 1 + .../eclipse/jetty/client/HttpConnection.java | 22 +- .../jetty/client/WebSocketUpgradeTest.java | 12 +- .../eclipse/jetty/io/AbstractConnection.java | 4 +- .../org/eclipse/jetty/io/AsyncEndPoint.java | 11 - .../java/org/eclipse/jetty/io/Connection.java | 5 + .../java/org/eclipse/jetty/io/Idleable.java | 27 - .../jetty/io/nio/SelectChannelEndPoint.java | 45 +- .../eclipse/jetty/io/nio/SelectorManager.java | 7 +- .../io/nio/SslSelectChannelEndPoint.java | 9 - .../eclipse/jetty/server/HttpConnection.java | 3 +- .../jetty/server/handler/ConnectHandler.java | 26 + .../server/nio/SelectChannelConnector.java | 48 +- .../eclipse/jetty/websocket/TestClient.java | 8 + .../eclipse/jetty/websocket/TestServer.java | 73 +- .../eclipse/jetty/websocket/WebSocket.java | 54 +- .../websocket/WebSocketConnectionD00.java | 182 ++--- .../websocket/WebSocketConnectionD06.java | 743 ++++++++++-------- .../jetty/websocket/WebSocketFactory.java | 17 +- .../websocket/WebSocketGeneratorD00.java | 2 +- .../websocket/WebSocketGeneratorD01.java | 181 ----- .../jetty/websocket/WebSocketParserD01.java | 224 ------ .../websocket/WebSocketGeneratorD00Test.java | 89 +-- .../websocket/WebSocketGeneratorD01Test.java | 97 --- .../jetty/websocket/WebSocketLoadTest.java | 6 +- .../websocket/WebSocketMessageD00Test.java | 4 +- .../websocket/WebSocketMessageD01Test.java | 321 -------- .../websocket/WebSocketMessageD06Test.java | 12 +- .../websocket/WebSocketMessageW75Test.java | 190 ----- .../websocket/WebSocketParserD01Test.java | 190 ----- .../java/com/acme/WebSocketChatServlet.java | 6 +- 31 files changed, 722 insertions(+), 1897 deletions(-) delete mode 100644 jetty-io/src/main/java/org/eclipse/jetty/io/Idleable.java delete mode 100644 jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD01.java delete mode 100644 jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD01.java delete mode 100644 jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketGeneratorD01Test.java delete mode 100644 jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageD01Test.java delete mode 100644 jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketMessageW75Test.java delete mode 100644 jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketParserD01Test.java diff --git a/VERSION.txt b/VERSION.txt index bd2ac7f1aa6..5d05767c26a 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -1,5 +1,6 @@ jetty-7.4.0-SNAPSHOT + 342504 Scanner Listener + + 342700 refine websocket API for anticipated changes + JETTY-1362 Set root cause of UnavailableException + Various test harness cleanups to avoid random failures diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java index 1ac5410ae29..f3700077358 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/HttpConnection.java @@ -29,6 +29,7 @@ import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.http.HttpSchemes; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpVersions; +import org.eclipse.jetty.io.AbstractConnection; import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.Buffers; @@ -44,7 +45,7 @@ import org.eclipse.jetty.util.thread.Timeout; * * @version $Revision: 879 $ $Date: 2009-09-11 16:13:28 +0200 (Fri, 11 Sep 2009) $ */ -public class HttpConnection /* extends AbstractConnection */ implements Connection +public class HttpConnection extends AbstractConnection { private HttpDestination _destination; private HttpGenerator _generator; @@ -75,8 +76,7 @@ public class HttpConnection /* extends AbstractConnection */ implements Connecti HttpConnection(Buffers requestBuffers, Buffers responseBuffers, EndPoint endp) { - _endp=endp; - _timeStamp = System.currentTimeMillis(); + super(endp); _generator = new HttpGenerator(requestBuffers,endp); _parser = new HttpParser(responseBuffers,endp,new Handler()); @@ -431,7 +431,7 @@ public class HttpConnection /* extends AbstractConnection */ implements Connecti if (!_generator.isComplete() && _generator.getBytesBuffered()>0 && _endp instanceof AsyncEndPoint) { // Assume we are write blocked! - ((AsyncEndPoint)_endp).setWritable(false); + ((AsyncEndPoint)_endp).scheduleWrite(); } } @@ -727,18 +727,4 @@ public class HttpConnection /* extends AbstractConnection */ implements Connecti } } } - - - - // TODO remove and use AbstractConnection for 7.4 - private final long _timeStamp; - protected final EndPoint _endp; - public long getTimeStamp() - { - return _timeStamp; - } - public EndPoint getEndPoint() - { - return _endp; - } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/WebSocketUpgradeTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/WebSocketUpgradeTest.java index ce3491509a6..4173d7d3c17 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/WebSocketUpgradeTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/WebSocketUpgradeTest.java @@ -77,11 +77,11 @@ public class WebSocketUpgradeTest extends TestCase { Connection _connection; - public void onDisconnect(int closeCode, String message) + public void onClose(int closeCode, String message) { } - public void onConnect(Connection connection) + public void onOpen(Connection connection) { _connection=connection; _results.add("clientWS.onConnect"); @@ -118,11 +118,11 @@ public class WebSocketUpgradeTest extends TestCase protected Connection onSwitchProtocol(EndPoint endp) throws IOException { waitFor(3); - WebSocketConnectionD00 connection = new WebSocketConnectionD00(clientWS,endp,new WebSocketBuffers(4096),System.currentTimeMillis(),1000,"",0); + WebSocketConnectionD00 connection = new WebSocketConnectionD00(clientWS,endp,new WebSocketBuffers(4096),System.currentTimeMillis(),1000,""); _results.add("onSwitchProtocol"); _results.add(connection); - clientWS.onConnect(connection); + clientWS.onOpen(connection); return connection; } @@ -215,7 +215,7 @@ public class WebSocketUpgradeTest extends TestCase { Connection _connection; - public void onConnect(Connection connection) + public void onOpen(Connection connection) { _connection=connection; _webSockets.add(this); @@ -229,7 +229,7 @@ public class WebSocketUpgradeTest extends TestCase _results.add(data); } - public void onDisconnect(int code, String message) + public void onClose(int code, String message) { _results.add("onDisconnect"); _webSockets.remove(this); diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java index e1a5125d229..cea32d8b173 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AbstractConnection.java @@ -5,7 +5,7 @@ import java.io.IOException; import org.eclipse.jetty.util.log.Log; -public abstract class AbstractConnection implements Connection, Idleable +public abstract class AbstractConnection implements Connection { private final long _timeStamp; protected final EndPoint _endp; @@ -48,4 +48,4 @@ public abstract class AbstractConnection implements Connection, Idleable { return super.toString()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort(); } -} \ No newline at end of file +} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java index f7167b4d915..5eebc36ad80 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/AsyncEndPoint.java @@ -28,17 +28,6 @@ public interface AsyncEndPoint extends EndPoint */ public boolean isReadyForDispatch(); - - /* ------------------------------------------------------------ */ - /** Set the writable status. - * The writable status is considered next time the async scheduling - * is calculated. - * - * @param writable true if the endpoint is known to be writable or false - * if it is known to not be writable. - */ - public void setWritable(boolean writable); - /* ------------------------------------------------------------ */ /** Schedule a write dispatch. * Set the endpoint to not be writable and schedule a dispatch when diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java b/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java index 926db3a3d4f..431964afe8f 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/Connection.java @@ -51,4 +51,9 @@ public interface Connection * Called when the connection is closed */ void closed(); + + /** + * Called when the connection idle timeout expires + */ + void idleExpired(); } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/Idleable.java b/jetty-io/src/main/java/org/eclipse/jetty/io/Idleable.java deleted file mode 100644 index d0ac4cf94bf..00000000000 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/Idleable.java +++ /dev/null @@ -1,27 +0,0 @@ -// ======================================================================== -// Copyright (c) 2006-2009 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== - -package org.eclipse.jetty.io; - - -/* ------------------------------------------------------------ */ -/** Idleable. - * @deprecated Merge this into Connection at the next point release - */ -public interface Idleable -{ - /** - * Called when the connection idle timeout expires - */ - void idleExpired(); -} diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java index c65c1bec2fc..69f080c42dd 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectChannelEndPoint.java @@ -24,7 +24,6 @@ import org.eclipse.jetty.io.Buffer; import org.eclipse.jetty.io.ConnectedEndPoint; import org.eclipse.jetty.io.Connection; import org.eclipse.jetty.io.EofException; -import org.eclipse.jetty.io.Idleable; import org.eclipse.jetty.io.nio.SelectorManager.SelectSet; import org.eclipse.jetty.util.log.Log; @@ -52,7 +51,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo private boolean _writeBlocked; private boolean _open; private volatile long _idleTimestamp; - private boolean _changing=false; /* ------------------------------------------------------------ */ public SelectChannelEndPoint(SocketChannel channel, SelectSet selectSet, SelectionKey key, int maxIdleTime) @@ -199,7 +197,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo { _dispatched = false; Log.warn("Dispatched Failed! "+this+" to "+_manager); - new Throwable().printStackTrace(); updateKey(); } } @@ -250,21 +247,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo /* ------------------------------------------------------------ */ protected void idleExpired() { - if (_connection instanceof Idleable) - { - ((Idleable)_connection).idleExpired(); - } - else - { - try - { - shutdownOutput(); - } - catch(IOException e) - { - Log.ignore(e); - } - } + _connection.idleExpired(); } /* ------------------------------------------------------------ */ @@ -274,14 +257,19 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo public int flush(Buffer header, Buffer buffer, Buffer trailer) throws IOException { int l = super.flush(header, buffer, trailer); - if (!(_writable=l!=0)) + + // If there was something to write and it wasn't written, then we are not writable. + if (l==0 && ( header!=null && header.hasContent() || buffer!=null && buffer.hasContent() || trailer!=null && trailer.hasContent())) { synchronized (this) { + _writable=false; if (!_dispatched) updateKey(); } } + else + _writable=true; return l; } @@ -292,14 +280,20 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo public int flush(Buffer buffer) throws IOException { int l = super.flush(buffer); - if (!(_writable=l!=0)) + + // If there was something to write and it wasn't written, then we are not writable. + if (l==0 && buffer!=null && buffer.hasContent()) { synchronized (this) { + _writable=false; if (!_dispatched) updateKey(); } } + else + _writable=true; + return l; } @@ -390,13 +384,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo } return true; } - - /* ------------------------------------------------------------ */ - public void setWritable(boolean writable) - { - _writable=writable; - } - + /* ------------------------------------------------------------ */ public void scheduleWrite() { @@ -433,7 +421,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo if(_interestOps == ops && getChannel().isOpen()) return; - _changing=true; } _selectSet.addChange(this); _selectSet.wakeup(); @@ -447,7 +434,6 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo { synchronized (this) { - _changing=false; if (getChannel().isOpen()) { if (_interestOps>0) @@ -529,6 +515,7 @@ public class SelectChannelEndPoint extends ChannelEndPoint implements AsyncEndPo final Connection next = _connection.handle(); if (next!=_connection) { + Log.debug("{} replaced {}",next,_connection); _connection=next; continue; } diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java index 007aa130f85..3d0cf0e0f17 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SelectorManager.java @@ -516,7 +516,8 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa Object att = key.attachment(); if (att instanceof SelectChannelEndPoint) { - ((SelectChannelEndPoint)att).schedule(); + if (key.isReadable()||key.isWritable()) + ((SelectChannelEndPoint)att).schedule(); } else if (key.isConnectable()) { @@ -553,7 +554,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa SelectChannelEndPoint endpoint = createEndPoint(channel,key); key.attach(endpoint); if (key.isReadable()) - endpoint.schedule(); + endpoint.schedule(); } key = null; } @@ -837,7 +838,7 @@ public abstract class SelectorManager extends AbstractLifeCycle implements Dumpa for (int i=0;i<100 && _selecting!=null;i++) { wakeup(); - Thread.sleep(1); + Thread.sleep(10); } } catch(Exception e) diff --git a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslSelectChannelEndPoint.java b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslSelectChannelEndPoint.java index ccc0f27d220..8dd583bee62 100644 --- a/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslSelectChannelEndPoint.java +++ b/jetty-io/src/main/java/org/eclipse/jetty/io/nio/SslSelectChannelEndPoint.java @@ -954,15 +954,6 @@ public class SslSelectChannelEndPoint extends SelectChannelEndPoint return _engine; } - /* ------------------------------------------------------------ */ - @Override - public void setWritable(boolean writable) - { - // only set !writable if we are not waiting for input - if (writable || !HandshakeStatus.NEED_UNWRAP.equals(_engine.getHandshakeStatus()) || super.isBufferingOutput()) - super.setWritable(writable); - } - /* ------------------------------------------------------------ */ @Override public void scheduleWrite() diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index d2f35e337b0..82e2ed6b749 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -503,7 +503,7 @@ public class HttpConnection extends AbstractConnection implements Connection more_in_buffer=false; } else if (_generator.isCommitted() && !_generator.isComplete() && _endp instanceof AsyncEndPoint) - ((AsyncEndPoint)_endp).setWritable(false); + ((AsyncEndPoint)_endp).scheduleWrite(); } } } @@ -802,6 +802,7 @@ public class HttpConnection extends AbstractConnection implements Connection /* ------------------------------------------------------------ */ public void closed() { + Log.debug("closed {}",this); } /* ------------------------------------------------------------ */ diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java index 596d930c76c..1e60836af82 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ConnectHandler.java @@ -670,6 +670,19 @@ public class ConnectHandler extends HandlerWrapper writeData(); _endPoint.shutdownOutput(); } + + public void idleExpired() + { + try + { + shutdownOutput(); + } + catch(Exception e) + { + Log.debug(e); + close(); + } + } } public class ClientToProxyConnection implements Connection @@ -819,6 +832,19 @@ public class ConnectHandler extends HandlerWrapper { _endPoint.shutdownOutput(); } + + public void idleExpired() + { + try + { + shutdownOutput(); + } + catch(Exception e) + { + Log.debug(e); + close(); + } + } } /** diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java index f79c27acb41..61154f5533e 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/nio/SelectChannelConnector.java @@ -28,8 +28,10 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.nio.SelectChannelEndPoint; import org.eclipse.jetty.io.nio.SelectorManager; import org.eclipse.jetty.io.nio.SelectorManager.SelectSet; +import org.eclipse.jetty.server.Connector; import org.eclipse.jetty.server.HttpConnection; import org.eclipse.jetty.server.Request; +import org.eclipse.jetty.server.Server; import org.eclipse.jetty.util.component.AggregateLifeCycle; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.thread.Timeout.Task; @@ -304,6 +306,7 @@ public class SelectChannelConnector extends AbstractNIOConnector return new SelectChannelEndPoint(channel,selectSet,key, SelectChannelConnector.this._maxIdleTime); } + /* ------------------------------------------------------------------------------- */ protected void endPointClosed(SelectChannelEndPoint endpoint) { connectionClosed(endpoint.getConnection()); @@ -312,22 +315,7 @@ public class SelectChannelConnector extends AbstractNIOConnector /* ------------------------------------------------------------------------------- */ protected Connection newConnection(SocketChannel channel,final SelectChannelEndPoint endpoint) { - return new HttpConnection(SelectChannelConnector.this,endpoint,getServer()) - { - /* ------------------------------------------------------------ */ - @Override - public void cancelTimeout(Task task) - { - endpoint.getSelectSet().cancelTimeout(task); - } - - /* ------------------------------------------------------------ */ - @Override - public void scheduleTimeout(Task task, long timeoutMs) - { - endpoint.getSelectSet().scheduleTimeout(task,timeoutMs); - } - }; + return new SelectChannelHttpConnection(SelectChannelConnector.this,endpoint,getServer(),endpoint); } /* ------------------------------------------------------------ */ @@ -341,6 +329,34 @@ public class SelectChannelConnector extends AbstractNIOConnector AggregateLifeCycle.dump(out,indent,Arrays.asList(new Object[]{_acceptChannel,_acceptChannel.isOpen()?"OPEN":"CLOSED",_manager})); } + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + private class SelectChannelHttpConnection extends HttpConnection + { + private final SelectChannelEndPoint _endpoint; + + private SelectChannelHttpConnection(Connector connector, EndPoint endpoint, Server server, SelectChannelEndPoint endpoint2) + { + super(connector,endpoint,server); + _endpoint = endpoint2; + } + + /* ------------------------------------------------------------ */ + @Override + public void cancelTimeout(Task task) + { + _endpoint.getSelectSet().cancelTimeout(task); + } + + /* ------------------------------------------------------------ */ + @Override + public void scheduleTimeout(Task task, long timeoutMs) + { + _endpoint.getSelectSet().scheduleTimeout(task,timeoutMs); + } + } + /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/TestClient.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/TestClient.java index 642671a2dd4..99cf793959a 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/TestClient.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/TestClient.java @@ -21,6 +21,9 @@ import org.eclipse.jetty.util.log.Log; /** * @version $Revision$ $Date$ + * + * This is not a general purpose websocket client. + * It's only for testing the websocket server and is hardwired to a specific draft version of the protocol. */ public class TestClient { @@ -66,6 +69,11 @@ public class TestClient _socket.close(); return; } + else if (opcode == WebSocketConnectionD06.OP_PING) + { + _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length(),_socket.getSoTimeout()); + _generator.flush(_socket.getSoTimeout()); + } _messageBytes+=buffer.length(); diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/TestServer.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/TestServer.java index f53eb20bc1c..a9e5df35cfc 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/TestServer.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/TestServer.java @@ -11,7 +11,6 @@ import org.eclipse.jetty.server.nio.SelectChannelConnector; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.websocket.WebSocket.Connection; public class TestServer extends Server { @@ -62,7 +61,7 @@ public class TestServer extends Server _rHandler=new ResourceHandler(); _rHandler.setDirectoriesListed(true); - _rHandler.setResourceBase("."); + _rHandler.setResourceBase("src/test/webapp"); _wsHandler.setHandler(_rHandler); } @@ -95,21 +94,27 @@ public class TestServer extends Server /* ------------------------------------------------------------ */ class TestWebSocket implements WebSocket, WebSocket.OnFrame, WebSocket.OnBinaryMessage, WebSocket.OnTextMessage, WebSocket.OnControl { - protected Connection _connection; + protected FrameConnection _connection; - public Connection getOutbound() + public FrameConnection getConnection() { return _connection; } - public void onConnect(Connection connection) + public void onOpen(Connection connection) { - _connection = connection; if (_verbose) - System.err.printf("%s#onConnect %s\n",this.getClass().getSimpleName(),connection); + System.err.printf("%s#onOpen %s\n",this.getClass().getSimpleName(),connection); + } + + public void onHandshake(FrameConnection connection) + { + if (_verbose) + System.err.printf("%s#onHandshake %s %s\n",this.getClass().getSimpleName(),connection,connection.getClass().getSimpleName()); + _connection = connection; } - public void onDisconnect(int code,String message) + public void onClose(int code,String message) { if (_verbose) System.err.printf("%s#onDisonnect %d %s\n",this.getClass().getSimpleName(),code,message); @@ -147,9 +152,9 @@ public class TestServer extends Server class TestEchoWebSocket extends TestWebSocket { @Override - public void onConnect(Connection connection) + public void onOpen(Connection connection) { - super.onConnect(connection); + super.onOpen(connection); connection.setMaxTextMessageSize(-1); connection.setMaxBinaryMessageSize(-1); } @@ -160,16 +165,8 @@ public class TestServer extends Server super.onFrame(flags,opcode,data,offset,length); try { - switch(opcode) - { - case WebSocketConnectionD06.OP_CLOSE: - case WebSocketConnectionD06.OP_PING: - case WebSocketConnectionD06.OP_PONG: - break; - default: - getOutbound().sendFrame(flags,opcode,data,offset,length); - } - } + if (!getConnection().isControl(opcode)) + getConnection().sendFrame(flags,opcode,data,offset,length); } catch (IOException e) { e.printStackTrace(); @@ -184,16 +181,16 @@ public class TestServer extends Server class TestEchoBroadcastWebSocket extends TestWebSocket { @Override - public void onConnect(Connection connection) + public void onOpen(Connection connection) { - super.onConnect(connection); + super.onOpen(connection); _broadcast.add(this); } @Override - public void onDisconnect(int code,String message) + public void onClose(int code,String message) { - super.onDisconnect(code,message); + super.onClose(code,message); _broadcast.remove(this); } @@ -205,7 +202,7 @@ public class TestServer extends Server { try { - ws.getOutbound().sendMessage(data,offset,length); + ws.getConnection().sendMessage(data,offset,length); } catch (IOException e) { @@ -223,7 +220,7 @@ public class TestServer extends Server { try { - ws.getOutbound().sendMessage(data); + ws.getConnection().sendMessage(data); } catch (IOException e) { @@ -240,9 +237,9 @@ public class TestServer extends Server { @Override - public void onConnect(Connection connection) + public void onOpen(Connection connection) { - super.onConnect(connection); + super.onOpen(connection); connection.setMaxTextMessageSize(64*1024); connection.setMaxBinaryMessageSize(64*1024); } @@ -253,7 +250,7 @@ public class TestServer extends Server super.onMessage(data,offset,length); try { - getOutbound().sendMessage(data,offset,length); + getConnection().sendMessage(data,offset,length); } catch (IOException e) { @@ -267,7 +264,7 @@ public class TestServer extends Server super.onMessage(data); try { - getOutbound().sendMessage(data); + getConnection().sendMessage(data); } catch (IOException e) { @@ -281,9 +278,9 @@ public class TestServer extends Server class TestEchoFragmentWebSocket extends TestWebSocket { @Override - public void onConnect(Connection connection) + public void onOpen(Connection connection) { - super.onConnect(connection); + super.onOpen(connection); connection.setMaxTextMessageSize(64*1024); connection.setMaxBinaryMessageSize(64*1024); } @@ -294,8 +291,8 @@ public class TestServer extends Server super.onMessage(data,offset,length); try { - getOutbound().sendFrame((byte)0x0,WebSocketConnectionD06.OP_BINARY,data,offset,length/2); - getOutbound().sendFrame((byte)0x8,WebSocketConnectionD06.OP_BINARY,data,offset+length/2,length-length/2); + getConnection().sendFrame((byte)0x0,getConnection().binaryOpcode(),data,offset,length/2); + getConnection().sendFrame((byte)0x8,getConnection().binaryOpcode(),data,offset+length/2,length-length/2); } catch (IOException e) { @@ -312,8 +309,8 @@ public class TestServer extends Server byte[] data = message.getBytes(StringUtil.__UTF8); int offset=0; int length=data.length; - getOutbound().sendFrame((byte)0x0,WebSocketConnectionD06.OP_TEXT,data,offset,length/2); - getOutbound().sendFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,offset+length/2,length-length/2); + getConnection().sendFrame((byte)0x0,getConnection().textOpcode(),data,offset,length/2); + getConnection().sendFrame((byte)0x8,getConnection().textOpcode(),data,offset+length/2,length-length/2); } catch (IOException e) { @@ -327,7 +324,7 @@ public class TestServer extends Server System.err.println("java -cp CLASSPATH "+TestServer.class+" [ OPTIONS ]"); System.err.println(" -p|--port PORT (default 8080)"); System.err.println(" -v|--verbose "); - System.err.println(" -d|--docroot file (default '.')"); + System.err.println(" -d|--docroot file (default 'src/test/webapp')"); System.exit(1); } @@ -337,7 +334,7 @@ public class TestServer extends Server { int port=8080; boolean verbose=false; - String docroot="."; + String docroot="src/test/webapp"; for (int i=0;i=0 max size of text frame aggregation buffer in characters + */ void setMaxTextMessageSize(int size); + + /** + * @param size size<0 no aggregation of binary frames, >=0 size of binary frame aggregation buffer + */ void setMaxBinaryMessageSize(int size); /** * Size in characters of the maximum text message to be received - * @return <0 No aggregation of frames to messages, >=0 max size of text frame aggregation buffer in characters + * @return size <0 No aggregation of frames to messages, >=0 max size of text frame aggregation buffer in characters */ int getMaxTextMessageSize(); /** * Size in bytes of the maximum binary message to be received - * @return <0 no aggregation of binary frames, >=0 size of binary frame aggregation buffer + * @return size <0 no aggregation of binary frames, >=0 size of binary frame aggregation buffer */ int getMaxBinaryMessageSize(); } + /** + * Frame Level Connection + *

The Connection interface at the level of sending/receiving frames rather than messages. + * + */ + public interface FrameConnection extends Connection + { + boolean isMessageComplete(byte flags); + void close(int closeCode,String message); + byte binaryOpcode(); + byte textOpcode(); + + boolean isControl(byte opcode); + boolean isText(byte opcode); + boolean isBinary(byte opcode); + boolean isContinuation(byte opcode); + boolean isClose(byte opcode); + boolean isPing(byte opcode); + boolean isPong(byte opcode); + + void sendControl(byte control,byte[] data, int offset, int length) throws IOException; + void sendFrame(byte flags,byte opcode,byte[] data, int offset, int length) throws IOException; + } + } diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD00.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD00.java index 6b4d004ce31..4e30a5310fe 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD00.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD00.java @@ -31,13 +31,13 @@ import org.eclipse.jetty.io.nio.SelectChannelEndPoint; import org.eclipse.jetty.util.StringUtil; import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.websocket.WebSocket.OnFrame; -public class WebSocketConnectionD00 extends AbstractConnection implements WebSocketConnection, WebSocket.Connection +public class WebSocketConnectionD00 extends AbstractConnection implements WebSocketConnection, WebSocket.FrameConnection { public final static byte LENGTH_FRAME=(byte)0x80; public final static byte SENTINEL_FRAME=(byte)0x00; - final IdleCheck _idle; final WebSocketParser _parser; final WebSocketGenerator _generator; @@ -47,11 +47,10 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc String _key2; ByteArrayBuffer _hixieBytes; - public WebSocketConnectionD00(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, int draft) + public WebSocketConnectionD00(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol) throws IOException { super(endpoint,timestamp); - // TODO - can we use the endpoint idle mechanism? if (endpoint instanceof AsyncEndPoint) ((AsyncEndPoint)endpoint).cancelIdle(); @@ -60,19 +59,9 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc _websocket = websocket; _protocol=protocol; - // Select the parser/generators to use - switch(draft) - { - case 1: - _generator = new WebSocketGeneratorD01(buffers, _endp); - _parser = new WebSocketParserD01(buffers, endpoint, new FrameHandlerD1(_websocket)); - break; - default: - _generator = new WebSocketGeneratorD00(buffers, _endp); - _parser = new WebSocketParserD00(buffers, endpoint, new FrameHandlerD0(_websocket)); - } + _generator = new WebSocketGeneratorD00(buffers, _endp); + _parser = new WebSocketParserD00(buffers, endpoint, new FrameHandlerD0(_websocket)); - // TODO should these be AsyncEndPoint checks/calls? if (_endp instanceof SelectChannelEndPoint) { final SelectChannelEndPoint scep=(SelectChannelEndPoint)_endp; @@ -95,7 +84,8 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc }; } } - + + /* ------------------------------------------------------------ */ public void setHixieKeys(String key1,String key2) { _key1=key1; @@ -103,6 +93,7 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc _hixieBytes=new IndirectNIOBuffer(16); } + /* ------------------------------------------------------------ */ public Connection handle() throws IOException { try @@ -146,7 +137,9 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc } } - _websocket.onConnect(this); + if (_websocket instanceof OnFrame) + ((OnFrame)_websocket).onHandshake(this); + _websocket.onOpen(this); return this; } @@ -169,6 +162,7 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc } catch(IOException e) { + Log.debug(e); try { _endp.close(); @@ -184,12 +178,19 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc if (_endp.isOpen()) { _idle.access(_endp); + + if (_endp.isInputShutdown() && _generator.isBufferEmpty()) + _endp.close(); + else + checkWriteable(); + checkWriteable(); } } return this; } + /* ------------------------------------------------------------ */ private void doTheHixieHixieShake() { byte[] result=WebSocketConnectionD00.doTheHixieHixieShake( @@ -199,25 +200,29 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc _hixieBytes.clear(); _hixieBytes.put(result); } - + + /* ------------------------------------------------------------ */ public boolean isOpen() { return _endp!=null&&_endp.isOpen(); } + /* ------------------------------------------------------------ */ public boolean isIdle() { return _parser.isBufferEmpty() && _generator.isBufferEmpty(); } + /* ------------------------------------------------------------ */ public boolean isSuspended() { return false; } + /* ------------------------------------------------------------ */ public void closed() { - _websocket.onDisconnect(0,""); + _websocket.onClose(WebSocketConnectionD06.CLOSE_NORMAL,""); } /* ------------------------------------------------------------ */ @@ -265,7 +270,7 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc } /* ------------------------------------------------------------ */ - public void disconnect(int code, String message) + public void close(int code, String message) { throw new UnsupportedOperationException(); } @@ -378,7 +383,9 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc response.addHeader("WebSocket-Protocol",subprotocol); response.sendError(101,"Web Socket Protocol Handshake"); response.flushBuffer(); - _websocket.onConnect(this); + if (_websocket instanceof OnFrame) + ((OnFrame)_websocket).onHandshake(this); + _websocket.onOpen(this); } } @@ -444,92 +451,57 @@ public class WebSocketConnectionD00 extends AbstractConnection implements WebSoc public void close(int code,String message) { - disconnect(code,message); + close(code,message); } } - - class FrameHandlerD1 implements WebSocketParser.FrameHandler + + public boolean isMessageComplete(byte flags) { - public final static byte PING=1; - public final static byte PONG=1; - - final WebSocket _websocket; - final Utf8StringBuilder _utf8 = new Utf8StringBuilder(); - boolean _fragmented=false; - - FrameHandlerD1(WebSocket websocket) - { - _websocket=websocket; - } - - public void onFrame(byte flags, byte opcode, Buffer buffer) - { - try - { - byte[] array=buffer.array(); - - if (opcode==0) - { - if (isMore(flags)) - { - _utf8.append(buffer.array(),buffer.getIndex(),buffer.length()); - _fragmented=true; - } - else if (_fragmented) - { - _utf8.append(buffer.array(),buffer.getIndex(),buffer.length()); - if (_websocket instanceof WebSocket.OnTextMessage) - ((WebSocket.OnTextMessage)_websocket).onMessage(_utf8.toString()); - _utf8.reset(); - _fragmented=false; - } - else - { - if (_websocket instanceof WebSocket.OnTextMessage) - ((WebSocket.OnTextMessage)_websocket).onMessage(buffer.toString(StringUtil.__UTF8)); - } - } - else if (opcode==PING) - { - sendFrame(flags,PONG,buffer.array(),buffer.getIndex(),buffer.length()); - } - else if (opcode==PONG) - { - - } - else - { - if (isMore(flags)) - { - if (_websocket instanceof WebSocket.OnFrame) - ((WebSocket.OnFrame)_websocket).onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()); - } - else if (_fragmented) - { - if (_websocket instanceof WebSocket.OnFrame) - ((WebSocket.OnFrame)_websocket).onFrame(flags,opcode,array,buffer.getIndex(),buffer.length()); - } - else - { - if (_websocket instanceof WebSocket.OnBinaryMessage) - ((WebSocket.OnBinaryMessage)_websocket).onMessage(array,buffer.getIndex(),buffer.length()); - } - } - } - catch(ThreadDeath th) - { - throw th; - } - catch(Throwable th) - { - Log.warn(th); - } - } - - public void close(int code,String message) - { - disconnect(code,message); - } + return true; } + public byte binaryOpcode() + { + return LENGTH_FRAME; + } + + public byte textOpcode() + { + return SENTINEL_FRAME; + } + + public boolean isControl(byte opcode) + { + return false; + } + + public boolean isText(byte opcode) + { + return (opcode&LENGTH_FRAME)==0; + } + + public boolean isBinary(byte opcode) + { + return (opcode&LENGTH_FRAME)!=0; + } + + public boolean isContinuation(byte opcode) + { + return false; + } + + public boolean isClose(byte opcode) + { + return false; + } + + public boolean isPing(byte opcode) + { + return false; + } + + public boolean isPong(byte opcode) + { + return false; + } } diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD06.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD06.java index 40b5aacb4b6..8d870b26518 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD06.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketConnectionD06.java @@ -29,7 +29,6 @@ import org.eclipse.jetty.io.EndPoint; import org.eclipse.jetty.io.nio.SelectChannelEndPoint; import org.eclipse.jetty.util.B64Code; import org.eclipse.jetty.util.StringUtil; -import org.eclipse.jetty.util.TypeUtil; import org.eclipse.jetty.util.Utf8StringBuilder; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.websocket.WebSocket.OnFrame; @@ -39,25 +38,25 @@ import org.eclipse.jetty.websocket.WebSocket.OnControl; public class WebSocketConnectionD06 extends AbstractConnection implements WebSocketConnection { - public final static byte OP_CONTINUATION = 0x00; - public final static byte OP_CLOSE = 0x01; - public final static byte OP_PING = 0x02; - public final static byte OP_PONG = 0x03; - public final static byte OP_TEXT = 0x04; - public final static byte OP_BINARY = 0x05; + final static byte OP_CONTINUATION = 0x00; + final static byte OP_CLOSE = 0x01; + final static byte OP_PING = 0x02; + final static byte OP_PONG = 0x03; + final static byte OP_TEXT = 0x04; + final static byte OP_BINARY = 0x05; - public final static int CLOSE_NORMAL=1000; - public final static int CLOSE_SHUTDOWN=1001; - public final static int CLOSE_PROTOCOL=1002; - public final static int CLOSE_BADDATA=1003; - public final static int CLOSE_LARGE=1004; + final static int CLOSE_NORMAL=1000; + final static int CLOSE_SHUTDOWN=1001; + final static int CLOSE_PROTOCOL=1002; + final static int CLOSE_BADDATA=1003; + final static int CLOSE_LARGE=1004; - public static boolean isLastFrame(int flags) + static boolean isLastFrame(int flags) { return (flags&0x8)!=0; } - public static boolean isControlFrame(int opcode) + static boolean isControlFrame(int opcode) { switch(opcode) { @@ -98,333 +97,16 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc } } - private final WebSocketParser.FrameHandler _frameHandler= new WebSocketParser.FrameHandler() - { - private final Utf8StringBuilder _utf8 = new Utf8StringBuilder(); - private ByteArrayBuffer _aggregate; - private byte _opcode=-1; - - public void onFrame(byte flags, byte opcode, Buffer buffer) - { - boolean more=(flags&0x8)==0; - - synchronized(WebSocketConnectionD06.this) - { - // Ignore incoming after a close - if (_closedIn) - return; - - try - { - byte[] array=buffer.array(); - - // Deliver frame if websocket is a FrameWebSocket - if (_onFrame!=null) - { - if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length())) - return; - } - - if (_onControl!=null && isControlFrame(opcode)) - { - if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length())) - return; - } - - switch(opcode) - { - case WebSocketConnectionD06.OP_CONTINUATION: - { - // If text, append to the message buffer - if (_opcode==WebSocketConnectionD06.OP_TEXT && _connection.getMaxTextMessageSize()>=0) - { - if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize())) - { - // If this is the last fragment, deliver the text buffer - if (more && _onTextMessage!=null) - { - _opcode=-1; - String msg =_utf8.toString(); - _utf8.reset(); - _onTextMessage.onMessage(msg); - } - } - else - { - _connection.disconnect(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars"); - _utf8.reset(); - _opcode=-1; - } - - } - else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0) - { - if (_aggregate.space()<_aggregate.length()) - { - _connection.disconnect(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize()); - _aggregate.clear(); - _opcode=-1; - } - else - { - _aggregate.put(buffer); - - // If this is the last fragment, deliver - if (!more && _onBinaryMessage!=null) - { - try - { - _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length()); - } - finally - { - _opcode=-1; - _aggregate.clear(); - } - } - } - } - break; - } - case WebSocketConnectionD06.OP_PING: - { - Log.debug("PING {}",this); - if (!_closedOut) - _connection.sendControl(WebSocketConnectionD06.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length()); - break; - } - - case WebSocketConnectionD06.OP_PONG: - { - Log.debug("PONG {}",this); - break; - } - - case WebSocketConnectionD06.OP_CLOSE: - { - int code=-1; - String message=null; - if (buffer.length()>=2) - { - code=buffer.array()[buffer.getIndex()]*0xff+buffer.array()[buffer.getIndex()+1]; - if (buffer.length()>2) - message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8); - } - closeIn(code,message); - break; - } - - - case WebSocketConnectionD06.OP_TEXT: - { - if(_onTextMessage!=null) - { - if (more) - { - if (_connection.getMaxTextMessageSize()>=0) - { - // If this is a text fragment, append to buffer - if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize())) - _opcode=WebSocketConnectionD06.OP_TEXT; - else - { - _utf8.reset(); - _opcode=-1; - _connection.disconnect(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars"); - } - } - } - else - { - // Deliver the message - _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8)); - } - } - break; - } - - default: - { - if (_onBinaryMessage!=null) - { - if (more) - { - if (_connection.getMaxBinaryMessageSize()>=0) - { - if (buffer.length()>_connection.getMaxBinaryMessageSize()) - { - _connection.disconnect(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize()); - if (_aggregate!=null) - _aggregate.clear(); - _opcode=-1; - } - else - { - _opcode=opcode; - if (_aggregate==null) - _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize()); - _aggregate.put(buffer); - } - } - } - else - { - _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length()); - } - } - } - } - } - catch(ThreadDeath th) - { - throw th; - } - catch(Throwable th) - { - Log.warn(th); - } - } - } - - public void close(int code,String message) - { - } - - public String toString() - { - return WebSocketConnectionD06.this.toString()+"FH"; - } - - }; + private final WebSocketParser.FrameHandler _frameHandler= new FrameHandlerD06(); /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ - private final WebSocket.Connection _connection = new WebSocket.Connection() - { - volatile boolean _disconnecting; - int _maxTextMessage=WebSocketConnectionD06.this._maxTextMessageSize; - int _maxBinaryMessage=WebSocketConnectionD06.this._maxBinaryMessageSize; - - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(byte, java.lang.String) - */ - public synchronized void sendMessage(String content) throws IOException - { - if (_closedOut) - throw new IOException("closing"); - byte[] data = content.getBytes(StringUtil.__UTF8); - _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,0,data.length,_endp.getMaxIdleTime()); - _generator.flush(); - checkWriteable(); - _idle.access(_endp); - } - - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(byte, byte[], int, int) - */ - public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException - { - if (_closedOut) - throw new IOException("closing"); - _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_BINARY,content,offset,length,_endp.getMaxIdleTime()); - _generator.flush(); - checkWriteable(); - _idle.access(_endp); - } - - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.websocket.WebSocketConnection#sendFrame(boolean, byte, byte[], int, int) - */ - public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException - { - if (_closedOut) - throw new IOException("closing"); - _generator.addFrame(flags,opcode,content,offset,length,_endp.getMaxIdleTime()); - _generator.flush(); - checkWriteable(); - _idle.access(_endp); - } - - /* ------------------------------------------------------------ */ - public void sendControl(byte control, byte[] data, int offset, int length) throws IOException - { - if (_closedOut) - throw new IOException("closing"); - _generator.addFrame((byte)0x8,control,data,offset,length,_endp.getMaxIdleTime()); - _generator.flush(); - checkWriteable(); - _idle.access(_endp); - } - - /* ------------------------------------------------------------ */ - public boolean isMore(byte flags) - { - return (flags&0x8)==0; - } - - /* ------------------------------------------------------------ */ - public boolean isOpen() - { - return _endp!=null&&_endp.isOpen(); - } - - /* ------------------------------------------------------------ */ - public void disconnect(int code, String message) - { - if (_disconnecting) - return; - _disconnecting=true; - WebSocketConnectionD06.this.closeOut(code,message); - } - - /* ------------------------------------------------------------ */ - public void disconnect() - { - if (_disconnecting) - return; - _disconnecting=true; - WebSocketConnectionD06.this.closeOut(1000,null); - } - - /* ------------------------------------------------------------ */ - public void setMaxTextMessageSize(int size) - { - _maxTextMessage=size; - } - - /* ------------------------------------------------------------ */ - public void setMaxBinaryMessageSize(int size) - { - _maxBinaryMessage=size; - } - - /* ------------------------------------------------------------ */ - public int getMaxTextMessageSize() - { - return _maxTextMessage; - } - - /* ------------------------------------------------------------ */ - public int getMaxBinaryMessageSize() - { - return _maxBinaryMessage; - } - - /* ------------------------------------------------------------ */ - public String getProtocol() - { - return _protocol; - } - - }; + private final WebSocket.FrameConnection _connection = new FrameConnectionD06(); /* ------------------------------------------------------------ */ - public WebSocketConnectionD06(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol, int draft) + public WebSocketConnectionD06(WebSocket websocket, EndPoint endpoint, WebSocketBuffers buffers, long timestamp, int maxIdleTime, String protocol) throws IOException { super(endpoint,timestamp); @@ -518,6 +200,8 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc _idle.access(_endp); if (_closedIn && _closedOut && _generator.isBufferEmpty()) _endp.close(); + else if (_endp.isInputShutdown() && !_closedIn) + closeIn(CLOSE_PROTOCOL,null); else checkWriteable(); } @@ -548,7 +232,7 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc /* ------------------------------------------------------------ */ public void closed() { - _webSocket.onDisconnect(WebSocketConnectionD06.CLOSE_NORMAL,""); + _webSocket.onClose(WebSocketConnectionD06.CLOSE_NORMAL,""); } /* ------------------------------------------------------------ */ @@ -612,7 +296,392 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc private void checkWriteable() { if (!_generator.isBufferEmpty() && _endp instanceof AsyncEndPoint) + { ((AsyncEndPoint)_endp).scheduleWrite(); + } + } + + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + private class FrameConnectionD06 implements WebSocket.FrameConnection + { + volatile boolean _disconnecting; + int _maxTextMessage=WebSocketConnectionD06.this._maxTextMessageSize; + int _maxBinaryMessage=WebSocketConnectionD06.this._maxBinaryMessageSize; + + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(byte, java.lang.String) + */ + public synchronized void sendMessage(String content) throws IOException + { + if (_closedOut) + throw new IOException("closing"); + byte[] data = content.getBytes(StringUtil.__UTF8); + _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_TEXT,data,0,data.length,_endp.getMaxIdleTime()); + _generator.flush(); + checkWriteable(); + _idle.access(_endp); + } + + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.websocket.WebSocketConnection#sendMessage(byte, byte[], int, int) + */ + public synchronized void sendMessage(byte[] content, int offset, int length) throws IOException + { + if (_closedOut) + throw new IOException("closing"); + _generator.addFrame((byte)0x8,WebSocketConnectionD06.OP_BINARY,content,offset,length,_endp.getMaxIdleTime()); + _generator.flush(); + checkWriteable(); + _idle.access(_endp); + } + + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.websocket.WebSocketConnection#sendFrame(boolean, byte, byte[], int, int) + */ + public void sendFrame(byte flags,byte opcode, byte[] content, int offset, int length) throws IOException + { + if (_closedOut) + throw new IOException("closing"); + _generator.addFrame(flags,opcode,content,offset,length,_endp.getMaxIdleTime()); + _generator.flush(); + checkWriteable(); + _idle.access(_endp); + } + + /* ------------------------------------------------------------ */ + public void sendControl(byte control, byte[] data, int offset, int length) throws IOException + { + if (_closedOut) + throw new IOException("closing"); + _generator.addFrame((byte)0x8,control,data,offset,length,_endp.getMaxIdleTime()); + _generator.flush(); + checkWriteable(); + _idle.access(_endp); + } + + /* ------------------------------------------------------------ */ + public boolean isMessageComplete(byte flags) + { + return isLastFrame(flags); + } + + /* ------------------------------------------------------------ */ + public boolean isOpen() + { + return _endp!=null&&_endp.isOpen(); + } + + /* ------------------------------------------------------------ */ + public void close(int code, String message) + { + if (_disconnecting) + return; + _disconnecting=true; + WebSocketConnectionD06.this.closeOut(code,message); + } + + /* ------------------------------------------------------------ */ + public void setMaxTextMessageSize(int size) + { + _maxTextMessage=size; + } + + /* ------------------------------------------------------------ */ + public void setMaxBinaryMessageSize(int size) + { + _maxBinaryMessage=size; + } + + /* ------------------------------------------------------------ */ + public int getMaxTextMessageSize() + { + return _maxTextMessage; + } + + /* ------------------------------------------------------------ */ + public int getMaxBinaryMessageSize() + { + return _maxBinaryMessage; + } + + /* ------------------------------------------------------------ */ + public String getProtocol() + { + return _protocol; + } + + /* ------------------------------------------------------------ */ + public byte binaryOpcode() + { + return OP_BINARY; + } + + /* ------------------------------------------------------------ */ + public byte textOpcode() + { + return OP_TEXT; + } + + /* ------------------------------------------------------------ */ + public boolean isControl(byte opcode) + { + return isControlFrame(opcode); + } + + /* ------------------------------------------------------------ */ + public boolean isText(byte opcode) + { + return opcode==OP_TEXT; + } + + /* ------------------------------------------------------------ */ + public boolean isBinary(byte opcode) + { + return opcode==OP_BINARY; + } + + /* ------------------------------------------------------------ */ + public boolean isContinuation(byte opcode) + { + return opcode==OP_CONTINUATION; + } + + /* ------------------------------------------------------------ */ + public boolean isClose(byte opcode) + { + return opcode==OP_CLOSE; + } + + /* ------------------------------------------------------------ */ + public boolean isPing(byte opcode) + { + return opcode==OP_PING; + } + + /* ------------------------------------------------------------ */ + public boolean isPong(byte opcode) + { + return opcode==OP_PONG; + } + + /* ------------------------------------------------------------ */ + public void disconnect() + { + close(CLOSE_NORMAL,null); + } + + /* ------------------------------------------------------------ */ + public String toString() + { + return this.getClass().getSimpleName()+"@"+_endp.getLocalAddr()+":"+_endp.getLocalPort()+"<->"+_endp.getRemoteAddr()+":"+_endp.getRemotePort(); + } + } + + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + /* ------------------------------------------------------------ */ + private class FrameHandlerD06 implements WebSocketParser.FrameHandler + { + private final Utf8StringBuilder _utf8 = new Utf8StringBuilder(); + private ByteArrayBuffer _aggregate; + private byte _opcode=-1; + + public void onFrame(byte flags, byte opcode, Buffer buffer) + { + boolean more=(flags&0x8)==0; + + synchronized(WebSocketConnectionD06.this) + { + // Ignore incoming after a close + if (_closedIn) + return; + + try + { + byte[] array=buffer.array(); + + // Deliver frame if websocket is a FrameWebSocket + if (_onFrame!=null) + { + if (_onFrame.onFrame(flags,opcode,array,buffer.getIndex(),buffer.length())) + return; + } + + if (_onControl!=null && isControlFrame(opcode)) + { + if (_onControl.onControl(opcode,array,buffer.getIndex(),buffer.length())) + return; + } + + switch(opcode) + { + case WebSocketConnectionD06.OP_CONTINUATION: + { + // If text, append to the message buffer + if (_opcode==WebSocketConnectionD06.OP_TEXT && _connection.getMaxTextMessageSize()>=0) + { + if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize())) + { + // If this is the last fragment, deliver the text buffer + if (more && _onTextMessage!=null) + { + _opcode=-1; + String msg =_utf8.toString(); + _utf8.reset(); + _onTextMessage.onMessage(msg); + } + } + else + { + _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars"); + _utf8.reset(); + _opcode=-1; + } + + } + else if (_opcode>=0 && _connection.getMaxBinaryMessageSize()>=0) + { + if (_aggregate.space()<_aggregate.length()) + { + _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize()); + _aggregate.clear(); + _opcode=-1; + } + else + { + _aggregate.put(buffer); + + // If this is the last fragment, deliver + if (!more && _onBinaryMessage!=null) + { + try + { + _onBinaryMessage.onMessage(_aggregate.array(),_aggregate.getIndex(),_aggregate.length()); + } + finally + { + _opcode=-1; + _aggregate.clear(); + } + } + } + } + break; + } + case WebSocketConnectionD06.OP_PING: + { + Log.debug("PING {}",this); + if (!_closedOut) + _connection.sendControl(WebSocketConnectionD06.OP_PONG,buffer.array(),buffer.getIndex(),buffer.length()); + break; + } + + case WebSocketConnectionD06.OP_PONG: + { + Log.debug("PONG {}",this); + break; + } + + case WebSocketConnectionD06.OP_CLOSE: + { + int code=-1; + String message=null; + if (buffer.length()>=2) + { + code=buffer.array()[buffer.getIndex()]*0xff+buffer.array()[buffer.getIndex()+1]; + if (buffer.length()>2) + message=new String(buffer.array(),buffer.getIndex()+2,buffer.length()-2,StringUtil.__UTF8); + } + closeIn(code,message); + break; + } + + + case WebSocketConnectionD06.OP_TEXT: + { + if(_onTextMessage!=null) + { + if (more) + { + if (_connection.getMaxTextMessageSize()>=0) + { + // If this is a text fragment, append to buffer + if (_utf8.append(buffer.array(),buffer.getIndex(),buffer.length(),_connection.getMaxTextMessageSize())) + _opcode=WebSocketConnectionD06.OP_TEXT; + else + { + _utf8.reset(); + _opcode=-1; + _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Text message size > "+_connection.getMaxTextMessageSize()+" chars"); + } + } + } + else + { + // Deliver the message + _onTextMessage.onMessage(buffer.toString(StringUtil.__UTF8)); + } + } + break; + } + + default: + { + if (_onBinaryMessage!=null) + { + if (more) + { + if (_connection.getMaxBinaryMessageSize()>=0) + { + if (buffer.length()>_connection.getMaxBinaryMessageSize()) + { + _connection.close(WebSocketConnectionD06.CLOSE_LARGE,"Message size > "+_connection.getMaxBinaryMessageSize()); + if (_aggregate!=null) + _aggregate.clear(); + _opcode=-1; + } + else + { + _opcode=opcode; + if (_aggregate==null) + _aggregate=new ByteArrayBuffer(_connection.getMaxBinaryMessageSize()); + _aggregate.put(buffer); + } + } + } + else + { + _onBinaryMessage.onMessage(array,buffer.getIndex(),buffer.length()); + } + } + } + } + } + catch(ThreadDeath th) + { + throw th; + } + catch(Throwable th) + { + Log.warn(th); + } + } + } + + public void close(int code,String message) + { + _connection.close(code,message); + } + + public String toString() + { + return WebSocketConnectionD06.this.toString()+"FH"; + } } /* ------------------------------------------------------------ */ @@ -637,7 +706,9 @@ public class WebSocketConnectionD06 extends AbstractConnection implements WebSoc response.addHeader("Sec-WebSocket-Protocol",subprotocol); response.sendError(101); - _webSocket.onConnect(_connection); + if (_onFrame!=null) + _onFrame.onHandshake(_connection); + _webSocket.onOpen(_connection); } /* ------------------------------------------------------------ */ diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java index 610ffd90c93..15ad5b90029 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketFactory.java @@ -21,6 +21,7 @@ import org.eclipse.jetty.http.HttpException; import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.io.ConnectedEndPoint; import org.eclipse.jetty.server.HttpConnection; +import org.eclipse.jetty.util.log.Log; /** * Factory to create WebSocket connections @@ -120,16 +121,16 @@ public class WebSocketFactory final WebSocketConnection connection; switch (draft) { - case 6: - connection = new WebSocketConnectionD06(websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol, draft); + case -1: + case 0: + connection = new WebSocketConnectionD00(websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol); + break; + case 6: + connection = new WebSocketConnectionD06(websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol); break; - case 5: - case 4: - case 3: - case 2: - throw new HttpException(400, "Unsupported draft specification: " + draft); default: - connection = new WebSocketConnectionD00(websocket, endp, _buffers, http.getTimeStamp(), _maxIdleTime, protocol, draft); + Log.warn("Unsupported Websocket version: "+draft); + throw new HttpException(400, "Unsupported draft specification: " + draft); } // Let the connection finish processing the handshake diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD00.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD00.java index c9f6f09a0ed..1cc46484f4f 100644 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD00.java +++ b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD00.java @@ -131,7 +131,7 @@ public class WebSocketGeneratorD00 implements WebSocketGenerator if (!_endp.isOpen()) throw new EofException(); - if (_buffer!=null) + if (_buffer!=null && _buffer.hasContent()) return _endp.flush(_buffer); return 0; diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD01.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD01.java deleted file mode 100644 index 94a8ca7179a..00000000000 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketGeneratorD01.java +++ /dev/null @@ -1,181 +0,0 @@ -// ======================================================================== -// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== - -package org.eclipse.jetty.websocket; - -import java.io.IOException; - -import org.eclipse.jetty.io.Buffer; -import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.io.EofException; - - -/* ------------------------------------------------------------ */ -/** WebSocketGenerator. - * This class generates websocket packets. - * It is fully synchronized because it is likely that async - * threads will call the addMessage methods while other - * threads are flushing the generator. - */ -public class WebSocketGeneratorD01 implements WebSocketGenerator -{ - final private WebSocketBuffers _buffers; - final private EndPoint _endp; - private Buffer _buffer; - - public WebSocketGeneratorD01(WebSocketBuffers buffers, EndPoint endp) - { - _buffers=buffers; - _endp=endp; - } - - public synchronized void addFrame(byte flags,byte opcode, byte[] content, int offset, int length, int blockFor) throws IOException - { - if (_buffer==null) - _buffer=_buffers.getDirectBuffer(); - - if (_buffer.space() == 0) - expelBuffer(blockFor); - - opcode = (byte)(opcode & 0x0f); - - while (length>0) - { - // slice a fragment off - int fragment=length; - if (fragment+10>_buffer.capacity()) - { - fragment=_buffer.capacity()-10; - bufferPut((byte)(0x80|opcode), blockFor); - } - else if ((flags&0x8)==0) - bufferPut(opcode, blockFor); - else - bufferPut((byte)(0x80|opcode), blockFor); - - if (fragment>0xffff) - { - bufferPut((byte)0x7f, blockFor); - bufferPut((byte)((fragment>>56)&0x7f), blockFor); - bufferPut((byte)((fragment>>48)&0xff), blockFor); - bufferPut((byte)((fragment>>40)&0xff), blockFor); - bufferPut((byte)((fragment>>32)&0xff), blockFor); - bufferPut((byte)((fragment>>24)&0xff), blockFor); - bufferPut((byte)((fragment>>16)&0xff), blockFor); - bufferPut((byte)((fragment>>8)&0xff), blockFor); - bufferPut((byte)(fragment&0xff), blockFor); - } - else if (fragment >=0x7e) - { - bufferPut((byte)126, blockFor); - bufferPut((byte)(fragment>>8), blockFor); - bufferPut((byte)(fragment&0xff), blockFor); - } - else - { - bufferPut((byte)fragment, blockFor); - } - - int remaining = fragment; - while (remaining > 0) - { - _buffer.compact(); - int chunk = remaining < _buffer.space() ? remaining : _buffer.space(); - _buffer.put(content, offset + (fragment - remaining), chunk); - remaining -= chunk; - if (_buffer.space() > 0) - { - // Gently flush the data, issuing a non-blocking write - flushBuffer(); - } - else - { - // Forcibly flush the data, issuing a blocking write - expelBuffer(blockFor); - if (remaining == 0) - { - // Gently flush the data, issuing a non-blocking write - flushBuffer(); - } - } - } - offset+=fragment; - length-=fragment; - } - } - - private synchronized void bufferPut(byte datum, long blockFor) throws IOException - { - if (_buffer==null) - _buffer=_buffers.getDirectBuffer(); - _buffer.put(datum); - if (_buffer.space() == 0) - expelBuffer(blockFor); - } - - public synchronized int flush(int blockFor) throws IOException - { - return expelBuffer(blockFor); - } - - public synchronized int flush() throws IOException - { - int flushed = flushBuffer(); - if (_buffer!=null && _buffer.length()==0) - { - _buffers.returnBuffer(_buffer); - _buffer=null; - } - return flushed; - } - - private synchronized int flushBuffer() throws IOException - { - if (!_endp.isOpen()) - throw new EofException(); - - if (_buffer!=null) - return _endp.flush(_buffer); - - return 0; - } - - private synchronized int expelBuffer(long blockFor) throws IOException - { - if (_buffer==null) - return 0; - int result = flushBuffer(); - _buffer.compact(); - if (!_endp.isBlocking()) - { - while (_buffer.space()==0) - { - // TODO: in case the I/O system signals write ready, but when we attempt to write we cannot - // TODO: we should decrease the blockFor timeout instead of waiting again the whole timeout - boolean ready = _endp.blockWritable(blockFor); - if (!ready) - throw new IOException("Write timeout"); - - result += flushBuffer(); - _buffer.compact(); - } - } - return result; - } - - public synchronized boolean isBufferEmpty() - { - return _buffer==null || _buffer.length()==0; - } - -} diff --git a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD01.java b/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD01.java deleted file mode 100644 index 72f24f09414..00000000000 --- a/jetty-websocket/src/main/java/org/eclipse/jetty/websocket/WebSocketParserD01.java +++ /dev/null @@ -1,224 +0,0 @@ -// ======================================================================== -// Copyright (c) 2010 Mort Bay Consulting Pty. Ltd. -// ------------------------------------------------------------------------ -// All rights reserved. This program and the accompanying materials -// are made available under the terms of the Eclipse Public License v1.0 -// and Apache License v2.0 which accompanies this distribution. -// The Eclipse Public License is available at -// http://www.eclipse.org/legal/epl-v10.html -// The Apache License v2.0 is available at -// http://www.opensource.org/licenses/apache2.0.php -// You may elect to redistribute this code under either of these licenses. -// ======================================================================== - -package org.eclipse.jetty.websocket; - -import java.io.IOException; - -import org.eclipse.jetty.io.Buffer; -import org.eclipse.jetty.io.Buffers; -import org.eclipse.jetty.io.EndPoint; -import org.eclipse.jetty.util.Utf8StringBuilder; -import org.eclipse.jetty.util.log.Log; - - - -/* ------------------------------------------------------------ */ -/** - * Parser the WebSocket protocol. - * - */ -public class WebSocketParserD01 implements WebSocketParser -{ - public enum State { - START(1), LENGTH_7(2), LENGTH_16(4), LENGTH_63(10), DATA(10); - - int _minSize; - - State(int minSize) - { - _minSize=minSize; - } - - int getMinSize() - { - return _minSize; - } - }; - - - private final WebSocketBuffers _buffers; - private final EndPoint _endp; - private final FrameHandler _handler; - private State _state=State.START; - private Buffer _buffer; - private byte _flags; - private byte _opcode; - private int _count; - private long _length; - private Utf8StringBuilder _utf8; - - /* ------------------------------------------------------------ */ - /** - * @param buffers The buffers to use for parsing. Only the {@link Buffers#getBuffer()} is used. - * This should be a direct buffer if binary data is mostly used or an indirect buffer if utf-8 data - * is mostly used. - * @param endp - * @param handler - */ - public WebSocketParserD01(WebSocketBuffers buffers, EndPoint endp, FrameHandler handler) - { - _buffers=buffers; - _endp=endp; - _handler=handler; - } - - /* ------------------------------------------------------------ */ - public boolean isBufferEmpty() - { - return _buffer==null || _buffer.length()==0; - } - - /* ------------------------------------------------------------ */ - public Buffer getBuffer() - { - return _buffer; - } - - /* ------------------------------------------------------------ */ - /** Parse to next event. - * Parse to the next {@link WebSocketParser.FrameHandler} event or until no more data is - * available. Fill data from the {@link EndPoint} only as necessary. - * @return An indication of progress or otherwise. -1 indicates EOF, 0 indicates - * that no bytes were read and no messages parsed. A positive number indicates either - * the bytes filled or the messages parsed. - */ - public int parseNext() - { - if (_buffer==null) - _buffer=_buffers.getBuffer(); - - int total_filled=0; - - // Loop until an datagram call back or can't fill anymore - while(true) - { - int available=_buffer.length(); - - // Fill buffer if we need a byte or need length - if (available < _state.getMinSize() || _state==State.DATA && available<_count) - { - // compact to mark (set at start of data) - _buffer.compact(); - - // if no space, then the data is too big for buffer - if (_buffer.space() == 0) - throw new IllegalStateException("FULL"); - - // catch IOExceptions (probably EOF) and try to parse what we have - try - { - int filled=_endp.isOpen()?_endp.fill(_buffer):-1; - if (filled<=0) - return total_filled>0?total_filled:-1; - total_filled+=filled; - available=_buffer.length(); - } - catch(IOException e) - { - Log.debug(e); - return total_filled>0?total_filled:-1; - } - } - - // Parse the buffer byte by byte (unless it is STATE_DATA) - byte b; - while (_state!=State.DATA && available-->0) - { - switch (_state) - { - case START: - b=_buffer.get(); - _opcode=(byte)(b&0xf); - _flags=(byte)(b>>4); - _state=State.LENGTH_7; - continue; - - case LENGTH_7: - b=_buffer.get(); - switch(b) - { - case 127: - _length=0; - _count=8; - _state=State.LENGTH_63; - break; - case 126: - _length=0; - _count=2; - _state=State.LENGTH_16; - break; - default: - _length=(0x7f&b); - _count=(int)_length; - _state=State.DATA; - } - continue; - - case LENGTH_16: - b=_buffer.get(); - _length = _length<<8 | b; - if (--_count==0) - { - if (_length>=_buffer.capacity()-4) - throw new IllegalStateException("TOO LARGE"); - _count=(int)_length; - _state=State.DATA; - } - continue; - - case LENGTH_63: - b=_buffer.get(); - _length = _length<<8 | b; - if (--_count==0) - { - if (_length>=_buffer.capacity()-10) - throw new IllegalStateException("TOO LARGE"); - _count=(int)_length; - _state=State.DATA; - } - continue; - } - } - - if (_state==State.DATA && available>=_count) - { - _handler.onFrame(_flags, _opcode, _buffer.get(_count)); - _count=0; - _state=State.START; - - if (_buffer.length()==0) - { - _buffers.returnBuffer(_buffer); - _buffer=null; - } - - return total_filled; - - } - } - } - - /* ------------------------------------------------------------ */ - public void fill(Buffer buffer) - { - if (buffer!=null && buffer.length()>0) - { - if (_buffer==null) - _buffer=_buffers.getBuffer(); - _buffer.put(buffer); - buffer.clear(); - } - } - -} diff --git a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketGeneratorD00Test.java b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketGeneratorD00Test.java index 68fb431169c..4602b153d07 100644 --- a/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketGeneratorD00Test.java +++ b/jetty-websocket/src/test/java/org/eclipse/jetty/websocket/WebSocketGeneratorD00Test.java @@ -21,7 +21,7 @@ public class WebSocketGeneratorD00Test { WebSocketBuffers buffers = new WebSocketBuffers(1024); ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); - _generator = new WebSocketGeneratorD01(buffers, endPoint); + _generator = new WebSocketGeneratorD00(buffers, endPoint); _out = new ByteArrayBuffer(4096); endPoint.setOut(_out); } @@ -32,7 +32,32 @@ public class WebSocketGeneratorD00Test byte[] data="Hell\uFF4F W\uFF4Frld".getBytes(StringUtil.__UTF8); _generator.addFrame((byte)0x0,(byte)0x04,data,0,data.length,0); _generator.flush(); - assertEquals(4,_out.get()); + assertEquals((byte)0x04,_out.get()); + assertEquals('H',_out.get()); + assertEquals('e',_out.get()); + assertEquals('l',_out.get()); + assertEquals('l',_out.get()); + assertEquals(0xEF,0xff&_out.get()); + assertEquals(0xBD,0xff&_out.get()); + assertEquals(0x8F,0xff&_out.get()); + assertEquals(' ',_out.get()); + assertEquals('W',_out.get()); + assertEquals(0xEF,0xff&_out.get()); + assertEquals(0xBD,0xff&_out.get()); + assertEquals(0x8F,0xff&_out.get()); + assertEquals('r',_out.get()); + assertEquals('l',_out.get()); + assertEquals('d',_out.get()); + assertEquals((byte)0xff,_out.get()); + } + + @Test + public void testOneBinaryString() throws Exception + { + byte[] data="Hell\uFF4F W\uFF4Frld".getBytes(StringUtil.__UTF8); + _generator.addFrame((byte)0x0,(byte)0x84,data,0,data.length,0); + _generator.flush(); + assertEquals((byte)0x84,_out.get()); assertEquals(15,_out.get()); assertEquals('H',_out.get()); assertEquals('e',_out.get()); @@ -51,64 +76,4 @@ public class WebSocketGeneratorD00Test assertEquals('d',_out.get()); } - @Test - public void testOneMediumBuffer() throws Exception - { - byte[] b=new byte[501]; - for (int i=0;i>8),0xff&_out.get()); - assertEquals(0xff&b.length,0xff&_out.get()); - for (int i=0;i>7),0xff&_out.get()); - assertEquals(0x7f&b.length,0xff&_out.get()); - for (int i=0;i>8)); - _in.put((byte)(bytes.length&0xff)); - _in.put(bytes); - - int filled =_parser.parseNext(); - - assertEquals(bytes.length+4,filled); - assertEquals(string,_handler._data.get(0)); - assertTrue(_parser.isBufferEmpty()); - assertTrue(_parser.getBuffer()==null); - } - - @Test - public void testLongText() throws Exception - { - - WebSocketBuffers buffers = new WebSocketBuffers(0x20000); - ByteArrayEndPoint endPoint = new ByteArrayEndPoint(); - WebSocketParser parser=new WebSocketParserD01(buffers, endPoint,_handler); - ByteArrayBuffer in = new ByteArrayBuffer(0x20000); - endPoint.setIn(in); - - - String string = "Hell\uFF4f Big W\uFF4Frld "; - for (int i=0;i<12;i++) - string = string+string; - string += ". The end."; - - byte[] bytes = string.getBytes("UTF-8"); - - in.put((byte)0x00); - in.put((byte)0x7F); - in.put((byte)0x00); - in.put((byte)0x00); - in.put((byte)0x00); - in.put((byte)0x00); - in.put((byte)0x00); - in.put((byte)(bytes.length>>16)); - in.put((byte)((bytes.length>>8)&0xff)); - in.put((byte)(bytes.length&0xff)); - in.put(bytes); - - int filled =parser.parseNext(); - - assertEquals(bytes.length+10,filled); - assertEquals(string,_handler._data.get(0)); - assertTrue(parser.isBufferEmpty()); - assertTrue(parser.getBuffer()==null); - } - - @Test - public void testShortFragmentTest() throws Exception - { - _in.put((byte)0x80); - _in.put((byte)0x06); - _in.put("Hello ".getBytes(StringUtil.__UTF8)); - _in.put((byte)0x00); - _in.put((byte)0x05); - _in.put("World".getBytes(StringUtil.__UTF8)); - - int filled =_parser.parseNext(); - - assertEquals(15,filled); - assertEquals(0,_handler._data.size()); - assertFalse(_parser.isBufferEmpty()); - assertFalse(_parser.getBuffer()==null); - - filled =_parser.parseNext(); - - assertEquals(0,filled); - assertEquals("Hello World",_handler._data.get(0)); - assertTrue(_parser.isBufferEmpty()); - assertTrue(_parser.getBuffer()==null); - } - - - private class Handler implements WebSocketParser.FrameHandler - { - Utf8StringBuilder _utf8 = new Utf8StringBuilder(); - public List _data = new ArrayList(); - - public void onFrame(byte flags, byte opcode, Buffer buffer) - { - if ((flags&0x8)!=0) - _utf8.append(buffer.array(),buffer.getIndex(),buffer.length()); - else if (_utf8.length()==0) - _data.add(opcode,buffer.toString("utf-8")); - else - { - _utf8.append(buffer.array(),buffer.getIndex(),buffer.length()); - _data.add(opcode,_utf8.toString()); - _utf8.reset(); - } - } - - public void close(int code,String message) - { - } - } -} diff --git a/test-jetty-webapp/src/main/java/com/acme/WebSocketChatServlet.java b/test-jetty-webapp/src/main/java/com/acme/WebSocketChatServlet.java index 9b8e8f544e7..51111a1bc1b 100644 --- a/test-jetty-webapp/src/main/java/com/acme/WebSocketChatServlet.java +++ b/test-jetty-webapp/src/main/java/com/acme/WebSocketChatServlet.java @@ -35,7 +35,7 @@ public class WebSocketChatServlet extends WebSocketServlet { Connection _connection; - public void onConnect(Connection connection) + public void onOpen(Connection connection) { // Log.info(this+" onConnect"); _connection=connection; @@ -50,7 +50,7 @@ public class WebSocketChatServlet extends WebSocketServlet public void onMessage(String data) { if (data.indexOf("disconnect")>=0) - _connection.disconnect(0,""); + _connection.disconnect(); else { // Log.info(this+" onMessage: "+data); @@ -68,7 +68,7 @@ public class WebSocketChatServlet extends WebSocketServlet } } - public void onDisconnect(int code, String message) + public void onClose(int code, String message) { // Log.info(this+" onDisconnect"); _members.remove(this);