Merge branch 'jetty-9' of ssh://git.eclipse.org/gitroot/jetty/org.eclipse.jetty.project into jetty-9

This commit is contained in:
Joakim Erdfelt 2012-07-05 12:21:06 -07:00
commit 502731d719
3 changed files with 51 additions and 17 deletions

View File

@ -25,6 +25,7 @@ import org.eclipse.jetty.http.HttpGenerator.ResponseInfo;
import org.eclipse.jetty.http.HttpParser; import org.eclipse.jetty.http.HttpParser;
import org.eclipse.jetty.http.HttpStatus; import org.eclipse.jetty.http.HttpStatus;
import org.eclipse.jetty.io.AbstractAsyncConnection; import org.eclipse.jetty.io.AbstractAsyncConnection;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.AsyncEndPoint; import org.eclipse.jetty.io.AsyncEndPoint;
import org.eclipse.jetty.io.ByteBufferPool; import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.EofException;
@ -42,6 +43,8 @@ public class HttpConnection extends AbstractAsyncConnection
private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<HttpConnection>(); private static final ThreadLocal<HttpConnection> __currentConnection = new ThreadLocal<HttpConnection>();
public static final String UPGRADE_CONNECTION_ATTR = "org.eclispe.jetty.server.HttpConnection.UPGRADE";
private final Object _lock = this; private final Object _lock = this;
private final Server _server; private final Server _server;
private final HttpConnector _connector; private final HttpConnector _connector;
@ -51,6 +54,7 @@ public class HttpConnection extends AbstractAsyncConnection
private final ByteBufferPool _bufferPool; private final ByteBufferPool _bufferPool;
private final HttpHttpInput _httpInput; private final HttpHttpInput _httpInput;
private volatile Thread _thread;
private ResponseInfo _info; private ResponseInfo _info;
ByteBuffer _requestBuffer=null; ByteBuffer _requestBuffer=null;
ByteBuffer _responseHeader=null; ByteBuffer _responseHeader=null;
@ -112,6 +116,12 @@ public class HttpConnection extends AbstractAsyncConnection
return _server; return _server;
} }
/* ------------------------------------------------------------ */
public Thread getThread()
{
return _thread;
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/** /**
* @return Returns the connector. * @return Returns the connector.
@ -213,6 +223,7 @@ public class HttpConnection extends AbstractAsyncConnection
try try
{ {
_thread=Thread.currentThread();
setCurrentConnection(this); setCurrentConnection(this);
// TODO try to generalize this loop into AbstractAsyncConnection // TODO try to generalize this loop into AbstractAsyncConnection
@ -303,6 +314,7 @@ public class HttpConnection extends AbstractAsyncConnection
} }
finally finally
{ {
_thread=null;
setCurrentConnection(null); setCurrentConnection(null);
} }
} }
@ -319,7 +331,7 @@ public class HttpConnection extends AbstractAsyncConnection
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private class HttpChannelOverHttp extends HttpChannel private class HttpChannelOverHttp extends HttpChannel implements Runnable
{ {
private HttpChannelOverHttp(Server server) private HttpChannelOverHttp(Server server)
{ {
@ -409,7 +421,7 @@ public class HttpConnection extends AbstractAsyncConnection
@Override @Override
protected synchronized void completed() protected synchronized void completed()
{ {
// This is called by HttpChannel#process when it knows that it's handling of the request/response cycle // This is called by HttpChannel#handle when it knows that it's handling of the request/response cycle
// is complete. This may be in the original thread dispatched to the connection that has called process from // is complete. This may be in the original thread dispatched to the connection that has called process from
// the connection#onReadable method, or it may be from a thread dispatched to call process as the result // the connection#onReadable method, or it may be from a thread dispatched to call process as the result
// of a resumed suspended request. // of a resumed suspended request.
@ -418,16 +430,27 @@ public class HttpConnection extends AbstractAsyncConnection
LOG.debug("{} completed"); LOG.debug("{} completed");
// TODO handle connection upgrade!
// Handle connection upgrades
if (getResponse().getStatus()==HttpStatus.SWITCHING_PROTOCOLS_101)
{
AsyncConnection connection=(AsyncConnection)getRequest().getAttribute(UPGRADE_CONNECTION_ATTR);
if (connection!=null)
{
LOG.debug("Upgrade from {} to {}",this,connection);
getEndPoint().setAsyncConnection(connection);
HttpConnection.this.reset();
return;
}
}
// Reset everything for the next cycle. // Reset everything for the next cycle.
HttpConnection.this.reset(); HttpConnection.this.reset();
// if the onReadable method is not executing // are called from non connection thread (ie dispatched from a resume)
if (getCurrentConnection()==null) if (getThread()!=Thread.currentThread())
{ {
// TODO is there a race here?
if (_parser.isStart()) if (_parser.isStart())
{ {
// it wants to eat more // it wants to eat more
@ -437,13 +460,9 @@ public class HttpConnection extends AbstractAsyncConnection
{ {
LOG.debug("{} pipelined",this); LOG.debug("{} pipelined",this);
// TODO avoid temporary runnable
try try
{ {
execute(new Runnable() execute(this);
{
@Override public void run() {onFillable();}
});
} }
catch(RejectedExecutionException e) catch(RejectedExecutionException e)
{ {
@ -469,6 +488,21 @@ public class HttpConnection extends AbstractAsyncConnection
} }
} }
/* ------------------------------------------------------------ */
@Override
public void run()
{
if (getThread()!=null)
{
// dispatched thread is still executing, try again later
// TODO - this probably should not be able to occur as the resume dispatch is not done until an unhandle
LOG.warn("Dispatch while dispatched???");
execute(this);
}
else
onFillable();
}
/* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */
private int generate(ByteBuffer content, Action action) throws IOException private int generate(ByteBuffer content, Action action) throws IOException
{ {

View File

@ -167,6 +167,7 @@ public class WebSocketAsyncConnection extends AbstractAsyncConnection implements
{ {
LOG.debug("onOpen()"); LOG.debug("onOpen()");
super.onOpen(); super.onOpen();
fillInterested();
} }
private void read(ByteBuffer buffer) private void read(ByteBuffer buffer)

View File

@ -343,7 +343,8 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
Executor executor = http.getConnector().findExecutor(); Executor executor = http.getConnector().findExecutor();
ByteBufferPool bufferPool = http.getConnector().getByteBufferPool(); ByteBufferPool bufferPool = http.getConnector().getByteBufferPool();
WebSocketAsyncConnection connection = new WebSocketAsyncConnection(endp,executor,websocket.getPolicy(),bufferPool); WebSocketAsyncConnection connection = new WebSocketAsyncConnection(endp,executor,websocket.getPolicy(),bufferPool);
endp.setAsyncConnection(connection); // Tell jetty about the new connection
request.setAttribute(HttpConnection.UPGRADE_CONNECTION_ATTR,connection);
LOG.debug("HttpConnection: {}",http); LOG.debug("HttpConnection: {}",http);
LOG.debug("AsyncWebSocketConnection: {}",connection); LOG.debug("AsyncWebSocketConnection: {}",connection);
@ -359,7 +360,6 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
// Process (version specific) handshake response // Process (version specific) handshake response
LOG.debug("Handshake Response: {}",handshaker); LOG.debug("Handshake Response: {}",handshaker);
handshaker.doHandshakeResponse(request,response,extensions); handshaker.doHandshakeResponse(request,response,extensions);
connection.fillInterested();
LOG.debug("EndPoint: {}",endp); LOG.debug("EndPoint: {}",endp);
LOG.debug("Handshake Complete: {}",connection); LOG.debug("Handshake Complete: {}",connection);
@ -367,12 +367,11 @@ public class WebSocketServerFactory extends AbstractLifeCycle implements WebSock
addConnection(connection); addConnection(connection);
// Notify POJO of connection // Notify POJO of connection
// TODO move to WebSocketAsyncConnection.onOpen
websocket.setConnection(connection); websocket.setConnection(connection);
websocket.onConnect(); websocket.onConnect();
// Tell jetty about the new connection
LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),version,response.getAcceptedSubProtocol(),connection); LOG.debug("Websocket upgrade {} {} {} {}",request.getRequestURI(),version,response.getAcceptedSubProtocol(),connection);
request.setAttribute("org.eclipse.jetty.io.Connection",connection); // TODO: this still needed?
return true; return true;
} }
} }