diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index b65bc2da26f..df8cc815763 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -25,6 +25,7 @@ import org.eclipse.jetty.http.HttpGenerator.ResponseInfo; import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.io.AbstractAsyncConnection; +import org.eclipse.jetty.io.AsyncConnection; import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.EofException; @@ -42,6 +43,8 @@ public class HttpConnection extends AbstractAsyncConnection private static final ThreadLocal __currentConnection = new ThreadLocal(); + public static final String UPGRADE_CONNECTION_ATTR = "org.eclispe.jetty.server.HttpConnection.UPGRADE"; + private final Object _lock = this; private final Server _server; private final HttpConnector _connector; @@ -51,6 +54,7 @@ public class HttpConnection extends AbstractAsyncConnection private final ByteBufferPool _bufferPool; private final HttpHttpInput _httpInput; + private volatile Thread _thread; private ResponseInfo _info; ByteBuffer _requestBuffer=null; ByteBuffer _responseHeader=null; @@ -112,6 +116,12 @@ public class HttpConnection extends AbstractAsyncConnection return _server; } + /* ------------------------------------------------------------ */ + public Thread getThread() + { + return _thread; + } + /* ------------------------------------------------------------ */ /** * @return Returns the connector. @@ -213,6 +223,7 @@ public class HttpConnection extends AbstractAsyncConnection try { + _thread=Thread.currentThread(); setCurrentConnection(this); // TODO try to generalize this loop into AbstractAsyncConnection @@ -303,6 +314,7 @@ public class HttpConnection extends AbstractAsyncConnection } finally { + _thread=null; setCurrentConnection(null); } } @@ -319,7 +331,7 @@ public class HttpConnection extends AbstractAsyncConnection /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ - private class HttpChannelOverHttp extends HttpChannel + private class HttpChannelOverHttp extends HttpChannel implements Runnable { private HttpChannelOverHttp(Server server) { @@ -409,7 +421,7 @@ public class HttpConnection extends AbstractAsyncConnection @Override protected synchronized void completed() { - // This is called by HttpChannel#process when it knows that it's handling of the request/response cycle + // This is called by HttpChannel#handle when it knows that it's handling of the request/response cycle // is complete. This may be in the original thread dispatched to the connection that has called process from // the connection#onReadable method, or it may be from a thread dispatched to call process as the result // of a resumed suspended request. @@ -418,16 +430,27 @@ public class HttpConnection extends AbstractAsyncConnection LOG.debug("{} completed"); - // TODO handle connection upgrade! + + // Handle connection upgrades + if (getResponse().getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101) + { + AsyncConnection connection=(AsyncConnection)getRequest().getAttribute(UPGRADE_CONNECTION_ATTR); + if (connection!=null) + { + LOG.debug("Upgrade from {} to {}",this,connection); + getEndPoint().setAsyncConnection(connection); + HttpConnection.this.reset(); + return; + } + } + // Reset everything for the next cycle. HttpConnection.this.reset(); - // if the onReadable method is not executing - if (getCurrentConnection()==null) - { - // TODO is there a race here? - + // are called from non connection thread (ie dispatched from a resume) + if (getThread()!=Thread.currentThread()) + { if (_parser.isStart()) { // it wants to eat more @@ -437,13 +460,9 @@ public class HttpConnection extends AbstractAsyncConnection { LOG.debug("{} pipelined",this); - // TODO avoid temporary runnable try { - execute(new Runnable() - { - @Override public void run() {onFillable();} - }); + execute(this); } catch(RejectedExecutionException e) { @@ -469,6 +488,21 @@ public class HttpConnection extends AbstractAsyncConnection } } + /* ------------------------------------------------------------ */ + @Override + public void run() + { + if (getThread()!=null) + { + // dispatched thread is still executing, try again later + // TODO - this probably should not be able to occur as the resume dispatch is not done until an unhandle + LOG.warn("Dispatch while dispatched???"); + execute(this); + } + else + onFillable(); + } + /* ------------------------------------------------------------ */ private int generate(ByteBuffer content, Action action) throws IOException { diff --git a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java index 811995f773e..315f687cfb2 100644 --- a/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java +++ b/jetty-websocket/websocket-core/src/main/java/org/eclipse/jetty/websocket/io/WebSocketAsyncConnection.java @@ -167,6 +167,7 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements { LOG.debug("onOpen()"); super.onOpen(); + fillInterested(); } private void read(ByteBuffer buffer) diff --git a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java index 507cf4292b8..f22bc5f134b 100644 --- a/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java +++ b/jetty-websocket/websocket-server/src/main/java/org/eclipse/jetty/websocket/server/WebSocketServerFactory.java @@ -343,7 +343,8 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock Executor executor = http.getConnector().findExecutor(); ByteBufferPool bufferPool = http.getConnector().getByteBufferPool(); WebSocketAsyncConnection connection = new WebSocketAsyncConnection(endp,executor,websocket.getPolicy(),bufferPool); - endp.setAsyncConnection(connection); + // Tell jetty about the new connection + request.setAttribute(HttpConnection.UPGRADE_CONNECTION_ATTR,connection); LOG.debug("HttpConnection: {}",http); LOG.debug("AsyncWebSocketConnection: {}",connection); @@ -359,7 +360,6 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock // Process (version specific) handshake response LOG.debug("Handshake Response: {}",handshaker); handshaker.doHandshakeResponse(request,response,extensions); - connection.fillInterested(); LOG.debug("EndPoint: {}",endp); LOG.debug("Handshake Complete: {}",connection); @@ -367,12 +367,11 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock addConnection(connection); // Notify POJO of connection + // TODO move to WebSocketAsyncConnection.onOpen websocket.setConnection(connection); websocket.onConnect(); - // Tell jetty about the new connection LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),version,response.getAcceptedSubProtocol(),connection); - request.setAttribute("org.eclipse.jetty.io.Connection",connection); // TODO: this still needed? return true; } }