jetty-9 Wired up AsyncContext and Continuations

This commit is contained in:
Greg Wilkins 2012-05-17 16:26:11 +02:00
parent 60b38b7775
commit c8df5b3477
9 changed files with 130 additions and 67 deletions

View File

@ -40,7 +40,6 @@ import org.eclipse.jetty.http.HttpVersion;
import org.eclipse.jetty.http.MimeTypes;
import org.eclipse.jetty.io.AsyncConnection;
import org.eclipse.jetty.io.EofException;
import org.eclipse.jetty.io.RuntimeIOException;
import org.eclipse.jetty.io.UncheckedPrintWriter;
import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.StringUtil;
@ -70,26 +69,27 @@ public abstract class HttpChannel
__currentChannel.set(channel);
}
private int _requests;
private final Server _server;
private final AsyncConnection _connection;
private final HttpURI _uri;
private final ChannelEventHandler _handler = new ChannelEventHandler();
private final HttpChannelState _state;
private final HttpFields _requestFields;
private final Request _request;
private final AsyncContinuation _state;
private final HttpInput _in;
private final HttpFields _responseFields;
private final Response _response;
private final HttpInput _in;
private final Output _out;
private volatile HttpWriter _writer;
private volatile PrintWriter _printWriter;
private HttpWriter _writer;
private PrintWriter _printWriter;
int _include;
private int _requests;
private int _include;
private HttpVersion _version = HttpVersion.HTTP_1_1;
@ -99,9 +99,6 @@ public abstract class HttpChannel
private boolean _host = false;
private final RequestHandler _handler = new RequestHandler();
/* ------------------------------------------------------------ */
/** Constructor
*
@ -113,16 +110,17 @@ public abstract class HttpChannel
_uri = new HttpURI(URIUtil.__CHARSET);
_requestFields = new HttpFields();
_responseFields = new HttpFields(server.getMaxCookieVersion());
_state = new HttpChannelState(this);
_request = new Request(this);
_response = new Response(this);
_state = _request.getAsyncContinuation();
_in=input;
_out=new Output();
}
public interface EventHandler extends HttpParser.RequestHandler
/* ------------------------------------------------------------ */
public HttpChannelState getState()
{
ResponseInfo commit();
return _state;
}
/* ------------------------------------------------------------ */
@ -153,11 +151,6 @@ public abstract class HttpChannel
}
/* ------------------------------------------------------------ */
public AsyncContinuation getAsyncContinuation()
{
return _state;
}
/* ------------------------------------------------------------ */
/**
@ -302,7 +295,7 @@ public abstract class HttpChannel
}
/* ------------------------------------------------------------ */
protected void handleRequest() throws IOException
protected void handleRequest()
{
LOG.debug("{} handleRequest",this);
@ -397,9 +390,16 @@ public abstract class HttpChannel
}
if (!_response.isCommitted() && !_request.isHandled())
_response.sendError(HttpServletResponse.SC_NOT_FOUND);
sendError(HttpServletResponse.SC_NOT_FOUND,null,null,false);
try
{
_response.complete();
}
catch(IOException e)
{
LOG.debug(e);
}
_request.setHandled(true);
completed();
}
@ -498,7 +498,7 @@ public abstract class HttpChannel
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
/* ------------------------------------------------------------ */
private class RequestHandler implements EventHandler
private class ChannelEventHandler implements EventHandler
{
@Override
public boolean startRequest(HttpMethod httpMethod,String method, String uri, HttpVersion version)
@ -772,7 +772,17 @@ public abstract class HttpChannel
protected abstract void completed();
protected abstract void execute(Runnable task);
public abstract Timer getTimer();
/* ------------------------------------------------------------ */
public interface EventHandler extends HttpParser.RequestHandler
{
ResponseInfo commit();
}
}

View File

@ -21,6 +21,7 @@ import java.util.TimerTask;
import javax.servlet.AsyncContext;
import javax.servlet.AsyncEvent;
import javax.servlet.AsyncListener;
import javax.servlet.RequestDispatcher;
import javax.servlet.ServletContext;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
@ -39,9 +40,9 @@ import org.eclipse.jetty.util.log.Logger;
/** Implementation of Continuation and AsyncContext interfaces
*
*/
public class AsyncContinuation implements AsyncContext, Continuation
public class HttpChannelState implements AsyncContext, Continuation
{
private static final Logger LOG = Log.getLogger(AsyncContinuation.class);
private static final Logger LOG = Log.getLogger(HttpChannelState.class);
private final static long DEFAULT_TIMEOUT=30000L;
@ -76,7 +77,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
};
/* ------------------------------------------------------------ */
protected HttpChannel _channel;
private final HttpChannel _channel;
private List<AsyncListener> _lastAsyncListeners;
private List<AsyncListener> _asyncListeners;
private List<ContinuationListener> _continuationListeners;
@ -92,8 +93,9 @@ public class AsyncContinuation implements AsyncContext, Continuation
private volatile boolean _continuation;
/* ------------------------------------------------------------ */
protected AsyncContinuation()
protected HttpChannelState(HttpChannel channel)
{
_channel=channel;
_state=State.IDLE;
_initial=true;
}
@ -105,15 +107,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
}
/* ------------------------------------------------------------ */
protected void setConnection(final HttpChannel connection)
{
synchronized(this)
{
_channel=connection;
}
}
/* ------------------------------------------------------------ */
@Override
public void addListener(AsyncListener listener)
{
synchronized(this)
@ -125,6 +119,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
}
/* ------------------------------------------------------------ */
@Override
public void addListener(AsyncListener listener,ServletRequest request, ServletResponse response)
{
synchronized(this)
@ -137,6 +132,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
}
/* ------------------------------------------------------------ */
@Override
public void addContinuationListener(ContinuationListener listener)
{
synchronized(this)
@ -148,6 +144,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
}
/* ------------------------------------------------------------ */
@Override
public void setTimeout(long ms)
{
synchronized(this)
@ -157,6 +154,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
}
/* ------------------------------------------------------------ */
@Override
public long getTimeout()
{
synchronized(this)
@ -174,15 +172,11 @@ public class AsyncContinuation implements AsyncContext, Continuation
}
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.continuation.Continuation#keepWrappers()
*/
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.continuation.Continuation#isResponseWrapped()
*/
@Override
public boolean isResponseWrapped()
{
return _responseWrapped;
@ -192,6 +186,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
/* (non-Javadoc)
* @see javax.servlet.ServletRequest#isInitial()
*/
@Override
public boolean isInitial()
{
synchronized(this)
@ -204,6 +199,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
/* (non-Javadoc)
* @see javax.servlet.ServletRequest#isSuspended()
*/
@Override
public boolean isSuspended()
{
synchronized(this)
@ -385,7 +381,6 @@ public class AsyncContinuation implements AsyncContext, Continuation
}
}
}
}
/* ------------------------------------------------------------ */
@ -442,6 +437,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
}
/* ------------------------------------------------------------ */
@Override
public void dispatch()
{
boolean dispatch=false;
@ -549,6 +545,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
/* (non-Javadoc)
* @see javax.servlet.ServletRequest#complete()
*/
@Override
public void complete()
{
// just like resume, except don't set _resumed=true;
@ -583,6 +580,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
}
/* ------------------------------------------------------------ */
@Override
public <T extends AsyncListener> T createListener(Class<T> clazz) throws ServletException
{
try
@ -630,8 +628,8 @@ public class AsyncContinuation implements AsyncContext, Continuation
{
if (ex!=null)
{
_event.getSuppliedRequest().setAttribute(Dispatcher.ERROR_EXCEPTION,ex);
_event.getSuppliedRequest().setAttribute(Dispatcher.ERROR_MESSAGE,ex.getMessage());
_event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,ex);
_event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,ex.getMessage());
listener.onError(_event);
}
else
@ -695,7 +693,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
/* ------------------------------------------------------------ */
protected void scheduleDispatch()
{
// _channel.asyncDispatch();
_channel.execute(_handleRequest);
}
/* ------------------------------------------------------------ */
@ -782,6 +780,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
}
/* ------------------------------------------------------------ */
@Override
public void dispatch(ServletContext context, String path)
{
_event._dispatchContext=context;
@ -790,6 +789,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
}
/* ------------------------------------------------------------ */
@Override
public void dispatch(String path)
{
_event._path=path;
@ -803,6 +803,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
}
/* ------------------------------------------------------------ */
@Override
public ServletRequest getRequest()
{
if (_event!=null)
@ -811,6 +812,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
}
/* ------------------------------------------------------------ */
@Override
public ServletResponse getResponse()
{
if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null)
@ -819,13 +821,15 @@ public class AsyncContinuation implements AsyncContext, Continuation
}
/* ------------------------------------------------------------ */
@Override
public void start(final Runnable run)
{
final AsyncEventState event=_event;
if (event!=null)
{
_channel.getServer().getThreadPool().dispatch(new Runnable()
_channel.execute(new Runnable()
{
@Override
public void run()
{
((Context)event.getServletContext()).getContextHandler().handle(run);
@ -835,6 +839,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
}
/* ------------------------------------------------------------ */
@Override
public boolean hasOriginalRequestAndResponse()
{
synchronized (this)
@ -857,6 +862,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
/**
* @see Continuation#isResumed()
*/
@Override
public boolean isResumed()
{
synchronized (this)
@ -868,6 +874,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
/**
* @see Continuation#isExpired()
*/
@Override
public boolean isExpired()
{
synchronized (this)
@ -880,6 +887,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
/**
* @see Continuation#resume()
*/
@Override
public void resume()
{
dispatch();
@ -889,18 +897,19 @@ public class AsyncContinuation implements AsyncContext, Continuation
/**
* @see Continuation#suspend()
*/
@Override
public void suspend(ServletResponse response)
{
_continuation=true;
if (response instanceof ServletResponseWrapper)
{
_responseWrapped=true;
AsyncContinuation.this.suspend(_channel.getRequest().getServletContext(),_channel.getRequest(),response);
HttpChannelState.this.suspend(_channel.getRequest().getServletContext(),_channel.getRequest(),response);
}
else
{
_responseWrapped=false;
AsyncContinuation.this.suspend(_channel.getRequest().getServletContext(),_channel.getRequest(),_channel.getResponse());
HttpChannelState.this.suspend(_channel.getRequest().getServletContext(),_channel.getRequest(),_channel.getResponse());
}
}
@ -908,17 +917,19 @@ public class AsyncContinuation implements AsyncContext, Continuation
/**
* @see Continuation#suspend()
*/
@Override
public void suspend()
{
_responseWrapped=false;
_continuation=true;
AsyncContinuation.this.suspend(_channel.getRequest().getServletContext(),_channel.getRequest(),_channel.getResponse());
HttpChannelState.this.suspend(_channel.getRequest().getServletContext(),_channel.getRequest(),_channel.getResponse());
}
/* ------------------------------------------------------------ */
/**
* @see org.eclipse.jetty.continuation.Continuation#getServletResponse()
*/
@Override
public ServletResponse getServletResponse()
{
if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null)
@ -930,6 +941,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
/**
* @see org.eclipse.jetty.continuation.Continuation#getAttribute(java.lang.String)
*/
@Override
public Object getAttribute(String name)
{
return _channel.getRequest().getAttribute(name);
@ -939,6 +951,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
/**
* @see org.eclipse.jetty.continuation.Continuation#removeAttribute(java.lang.String)
*/
@Override
public void removeAttribute(String name)
{
_channel.getRequest().removeAttribute(name);
@ -948,6 +961,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
/**
* @see org.eclipse.jetty.continuation.Continuation#setAttribute(java.lang.String, java.lang.Object)
*/
@Override
public void setAttribute(String name, Object attribute)
{
_channel.getRequest().setAttribute(name,attribute);
@ -957,6 +971,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
/**
* @see org.eclipse.jetty.continuation.Continuation#undispatch()
*/
@Override
public void undispatch()
{
if (isSuspended())
@ -976,7 +991,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
@Override
public void run()
{
AsyncContinuation.this.expired();
HttpChannelState.this.expired();
}
}
@ -991,7 +1006,7 @@ public class AsyncContinuation implements AsyncContext, Continuation
public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response)
{
super(AsyncContinuation.this, request,response);
super(HttpChannelState.this, request,response);
_suspendedContext=context;
}
@ -1015,4 +1030,13 @@ public class AsyncContinuation implements AsyncContext, Continuation
return _path;
}
}
private final Runnable _handleRequest = new Runnable()
{
@Override
public void run()
{
_channel.handleRequest();
}
};
}

View File

@ -527,8 +527,13 @@ public class HttpConnection extends AbstractAsyncConnection
@Override
public Timer getTimer()
{
// TODO Auto-generated method stub
return null;
return _connector.getTimer();
}
@Override
protected void execute(Runnable task)
{
_connector.findExecutor().execute(task);
}
private FutureCallback<Void> write(ByteBuffer b0,ByteBuffer b1,ByteBuffer b2)

View File

@ -2,6 +2,7 @@ package org.eclipse.jetty.server;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Timer;
import javax.servlet.ServletRequest;
@ -18,6 +19,7 @@ public abstract class HttpConnector extends AbstractConnector
private int _confidentialPort = 0;
private boolean _forwarded;
private String _hostHeader;
private Timer _timer = new Timer(true);
private String _forwardedHostHeader = HttpHeader.X_FORWARDED_HOST.toString();
private String _forwardedServerHeader = HttpHeader.X_FORWARDED_SERVER.toString();
@ -42,6 +44,28 @@ public abstract class HttpConnector extends AbstractConnector
super(acceptors);
}
@Override
protected void doStart() throws Exception
{
super.doStart();
_timer=new Timer("Timer-"+getName(),true);
}
@Override
protected void doStop() throws Exception
{
_timer.cancel();
_timer=null;
super.doStop();
}
public Timer getTimer()
{
return _timer;
}
public int getRequestHeaderSize()
{
return _requestHeaderSize;

View File

@ -119,7 +119,7 @@ public class Request implements HttpServletRequest
private final HttpChannel _channel;
private HttpFields _fields;
private final AsyncContinuation _async = new AsyncContinuation();
private final HttpChannelState _async;
private boolean _asyncSupported = true;
private volatile Attributes _attributes;
@ -169,8 +169,8 @@ public class Request implements HttpServletRequest
public Request(HttpChannel channel)
{
_channel = channel;
_async=channel.getState();
_fields=_channel.getRequestFields();
_async.setConnection(channel);
}
/* ------------------------------------------------------------ */
@ -311,7 +311,7 @@ public class Request implements HttpServletRequest
}
/* ------------------------------------------------------------ */
public AsyncContinuation getAsyncContinuation()
public HttpChannelState getAsyncContinuation()
{
return _async;
}

View File

@ -364,8 +364,8 @@ public class Server extends HandlerWrapper implements Attributes
*/
public void handleAsync(HttpChannel connection) throws IOException, ServletException
{
final AsyncContinuation async = connection.getRequest().getAsyncContinuation();
final AsyncContinuation.AsyncEventState state = async.getAsyncEventState();
final HttpChannelState async = connection.getRequest().getAsyncContinuation();
final HttpChannelState.AsyncEventState state = async.getAsyncEventState();
final Request baseRequest=connection.getRequest();
final String path=state.getPath();

View File

@ -22,7 +22,7 @@ import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.http.PathMap;
import org.eclipse.jetty.server.AsyncContinuation;
import org.eclipse.jetty.server.HttpChannelState;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HandlerContainer;
import org.eclipse.jetty.server.Request;
@ -179,7 +179,7 @@ public class ContextHandlerCollection extends HandlerCollection
if (handlers==null || handlers.length==0)
return;
AsyncContinuation async = baseRequest.getAsyncContinuation();
HttpChannelState async = baseRequest.getAsyncContinuation();
if (async.isAsync())
{
ContextHandler context=async.getContextHandler();

View File

@ -20,7 +20,7 @@ import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.server.AsyncContinuation;
import org.eclipse.jetty.server.HttpChannelState;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.RequestLog;
import org.eclipse.jetty.server.Response;
@ -51,7 +51,7 @@ public class RequestLogHandler extends HandlerWrapper
public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response)
throws IOException, ServletException
{
AsyncContinuation continuation = baseRequest.getAsyncContinuation();
HttpChannelState continuation = baseRequest.getAsyncContinuation();
if (!continuation.isInitial())
{
baseRequest.setDispatchTime(System.currentTimeMillis());

View File

@ -23,7 +23,7 @@ import javax.servlet.http.HttpServletResponse;
import org.eclipse.jetty.continuation.Continuation;
import org.eclipse.jetty.continuation.ContinuationListener;
import org.eclipse.jetty.server.AsyncContinuation;
import org.eclipse.jetty.server.HttpChannelState;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.Response;
import org.eclipse.jetty.util.statistic.CounterStatistic;
@ -53,7 +53,7 @@ public class StatisticsHandler extends HandlerWrapper
{
public void onComplete(Continuation continuation)
{
final Request request = ((AsyncContinuation)continuation).getBaseRequest();
final Request request = ((HttpChannelState)continuation).getBaseRequest();
final long elapsed = System.currentTimeMillis()-request.getTimeStamp();
_requestStats.decrement();
@ -100,7 +100,7 @@ public class StatisticsHandler extends HandlerWrapper
_dispatchedStats.increment();
final long start;
AsyncContinuation continuation = request.getAsyncContinuation();
HttpChannelState continuation = request.getAsyncContinuation();
if (continuation.isInitial())
{
// new request