From c8df5b3477e96226c3afa1c40f35ce78ab8e00d5 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Thu, 17 May 2012 16:26:11 +0200 Subject: [PATCH] jetty-9 Wired up AsyncContext and Continuations --- .../org/eclipse/jetty/server/HttpChannel.java | 60 ++++++++------ ...ontinuation.java => HttpChannelState.java} | 80 ++++++++++++------- .../eclipse/jetty/server/HttpConnection.java | 9 ++- .../eclipse/jetty/server/HttpConnector.java | 24 ++++++ .../org/eclipse/jetty/server/Request.java | 6 +- .../java/org/eclipse/jetty/server/Server.java | 4 +- .../handler/ContextHandlerCollection.java | 4 +- .../server/handler/RequestLogHandler.java | 4 +- .../server/handler/StatisticsHandler.java | 6 +- 9 files changed, 130 insertions(+), 67 deletions(-) rename jetty-server/src/main/java/org/eclipse/jetty/server/{AsyncContinuation.java => HttpChannelState.java} (94%) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java index 8730643f938..89418bfbf65 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannel.java @@ -40,7 +40,6 @@ import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MimeTypes; import org.eclipse.jetty.io.AsyncConnection; import org.eclipse.jetty.io.EofException; -import org.eclipse.jetty.io.RuntimeIOException; import org.eclipse.jetty.io.UncheckedPrintWriter; import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.StringUtil; @@ -70,26 +69,27 @@ public abstract class HttpChannel __currentChannel.set(channel); } - private int _requests; + private final Server _server; private final AsyncConnection _connection; private final HttpURI _uri; + private final ChannelEventHandler _handler = new ChannelEventHandler(); + private final HttpChannelState _state; + private final HttpFields _requestFields; private final Request _request; - private final AsyncContinuation _state; + private final HttpInput _in; private final HttpFields _responseFields; private final Response _response; - - - private final HttpInput _in; private final Output _out; - private volatile HttpWriter _writer; - private volatile PrintWriter _printWriter; + private HttpWriter _writer; + private PrintWriter _printWriter; - int _include; + private int _requests; + private int _include; private HttpVersion _version = HttpVersion.HTTP_1_1; @@ -97,11 +97,8 @@ public abstract class HttpChannel private boolean _expect100Continue = false; private boolean _expect102Processing = false; private boolean _host = false; - - private final RequestHandler _handler = new RequestHandler(); - /* ------------------------------------------------------------ */ /** Constructor * @@ -113,16 +110,17 @@ public abstract class HttpChannel _uri = new HttpURI(URIUtil.__CHARSET); _requestFields = new HttpFields(); _responseFields = new HttpFields(server.getMaxCookieVersion()); + _state = new HttpChannelState(this); _request = new Request(this); _response = new Response(this); - _state = _request.getAsyncContinuation(); _in=input; _out=new Output(); } - - public interface EventHandler extends HttpParser.RequestHandler + + /* ------------------------------------------------------------ */ + public HttpChannelState getState() { - ResponseInfo commit(); + return _state; } /* ------------------------------------------------------------ */ @@ -153,11 +151,6 @@ public abstract class HttpChannel } - /* ------------------------------------------------------------ */ - public AsyncContinuation getAsyncContinuation() - { - return _state; - } /* ------------------------------------------------------------ */ /** @@ -302,7 +295,7 @@ public abstract class HttpChannel } /* ------------------------------------------------------------ */ - protected void handleRequest() throws IOException + protected void handleRequest() { LOG.debug("{} handleRequest",this); @@ -397,9 +390,16 @@ public abstract class HttpChannel } if (!_response.isCommitted() && !_request.isHandled()) - _response.sendError(HttpServletResponse.SC_NOT_FOUND); + sendError(HttpServletResponse.SC_NOT_FOUND,null,null,false); - _response.complete(); + try + { + _response.complete(); + } + catch(IOException e) + { + LOG.debug(e); + } _request.setHandled(true); completed(); } @@ -498,7 +498,7 @@ public abstract class HttpChannel /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ - private class RequestHandler implements EventHandler + private class ChannelEventHandler implements EventHandler { @Override public boolean startRequest(HttpMethod httpMethod,String method, String uri, HttpVersion version) @@ -772,7 +772,17 @@ public abstract class HttpChannel protected abstract void completed(); + protected abstract void execute(Runnable task); + public abstract Timer getTimer(); + + /* ------------------------------------------------------------ */ + public interface EventHandler extends HttpParser.RequestHandler + { + ResponseInfo commit(); + } + + } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContinuation.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java similarity index 94% rename from jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContinuation.java rename to jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java index 9741779dfe9..b8699e083f0 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContinuation.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpChannelState.java @@ -21,6 +21,7 @@ import java.util.TimerTask; import javax.servlet.AsyncContext; import javax.servlet.AsyncEvent; import javax.servlet.AsyncListener; +import javax.servlet.RequestDispatcher; import javax.servlet.ServletContext; import javax.servlet.ServletException; import javax.servlet.ServletRequest; @@ -39,9 +40,9 @@ import org.eclipse.jetty.util.log.Logger; /** Implementation of Continuation and AsyncContext interfaces * */ -public class AsyncContinuation implements AsyncContext, Continuation +public class HttpChannelState implements AsyncContext, Continuation { - private static final Logger LOG = Log.getLogger(AsyncContinuation.class); + private static final Logger LOG = Log.getLogger(HttpChannelState.class); private final static long DEFAULT_TIMEOUT=30000L; @@ -76,7 +77,7 @@ public class AsyncContinuation implements AsyncContext, Continuation }; /* ------------------------------------------------------------ */ - protected HttpChannel _channel; + private final HttpChannel _channel; private List _lastAsyncListeners; private List _asyncListeners; private List _continuationListeners; @@ -92,8 +93,9 @@ public class AsyncContinuation implements AsyncContext, Continuation private volatile boolean _continuation; /* ------------------------------------------------------------ */ - protected AsyncContinuation() + protected HttpChannelState(HttpChannel channel) { + _channel=channel; _state=State.IDLE; _initial=true; } @@ -105,15 +107,7 @@ public class AsyncContinuation implements AsyncContext, Continuation } /* ------------------------------------------------------------ */ - protected void setConnection(final HttpChannel connection) - { - synchronized(this) - { - _channel=connection; - } - } - - /* ------------------------------------------------------------ */ + @Override public void addListener(AsyncListener listener) { synchronized(this) @@ -125,6 +119,7 @@ public class AsyncContinuation implements AsyncContext, Continuation } /* ------------------------------------------------------------ */ + @Override public void addListener(AsyncListener listener,ServletRequest request, ServletResponse response) { synchronized(this) @@ -137,6 +132,7 @@ public class AsyncContinuation implements AsyncContext, Continuation } /* ------------------------------------------------------------ */ + @Override public void addContinuationListener(ContinuationListener listener) { synchronized(this) @@ -148,6 +144,7 @@ public class AsyncContinuation implements AsyncContext, Continuation } /* ------------------------------------------------------------ */ + @Override public void setTimeout(long ms) { synchronized(this) @@ -157,6 +154,7 @@ public class AsyncContinuation implements AsyncContext, Continuation } /* ------------------------------------------------------------ */ + @Override public long getTimeout() { synchronized(this) @@ -174,15 +172,11 @@ public class AsyncContinuation implements AsyncContext, Continuation } } - /* ------------------------------------------------------------ */ - /** - * @see org.eclipse.jetty.continuation.Continuation#keepWrappers() - */ - /* ------------------------------------------------------------ */ /** * @see org.eclipse.jetty.continuation.Continuation#isResponseWrapped() */ + @Override public boolean isResponseWrapped() { return _responseWrapped; @@ -192,6 +186,7 @@ public class AsyncContinuation implements AsyncContext, Continuation /* (non-Javadoc) * @see javax.servlet.ServletRequest#isInitial() */ + @Override public boolean isInitial() { synchronized(this) @@ -204,6 +199,7 @@ public class AsyncContinuation implements AsyncContext, Continuation /* (non-Javadoc) * @see javax.servlet.ServletRequest#isSuspended() */ + @Override public boolean isSuspended() { synchronized(this) @@ -385,7 +381,6 @@ public class AsyncContinuation implements AsyncContext, Continuation } } } - } /* ------------------------------------------------------------ */ @@ -442,6 +437,7 @@ public class AsyncContinuation implements AsyncContext, Continuation } /* ------------------------------------------------------------ */ + @Override public void dispatch() { boolean dispatch=false; @@ -549,6 +545,7 @@ public class AsyncContinuation implements AsyncContext, Continuation /* (non-Javadoc) * @see javax.servlet.ServletRequest#complete() */ + @Override public void complete() { // just like resume, except don't set _resumed=true; @@ -583,6 +580,7 @@ public class AsyncContinuation implements AsyncContext, Continuation } /* ------------------------------------------------------------ */ + @Override public T createListener(Class clazz) throws ServletException { try @@ -630,8 +628,8 @@ public class AsyncContinuation implements AsyncContext, Continuation { if (ex!=null) { - _event.getSuppliedRequest().setAttribute(Dispatcher.ERROR_EXCEPTION,ex); - _event.getSuppliedRequest().setAttribute(Dispatcher.ERROR_MESSAGE,ex.getMessage()); + _event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,ex); + _event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,ex.getMessage()); listener.onError(_event); } else @@ -695,7 +693,7 @@ public class AsyncContinuation implements AsyncContext, Continuation /* ------------------------------------------------------------ */ protected void scheduleDispatch() { - // _channel.asyncDispatch(); + _channel.execute(_handleRequest); } /* ------------------------------------------------------------ */ @@ -782,6 +780,7 @@ public class AsyncContinuation implements AsyncContext, Continuation } /* ------------------------------------------------------------ */ + @Override public void dispatch(ServletContext context, String path) { _event._dispatchContext=context; @@ -790,6 +789,7 @@ public class AsyncContinuation implements AsyncContext, Continuation } /* ------------------------------------------------------------ */ + @Override public void dispatch(String path) { _event._path=path; @@ -803,6 +803,7 @@ public class AsyncContinuation implements AsyncContext, Continuation } /* ------------------------------------------------------------ */ + @Override public ServletRequest getRequest() { if (_event!=null) @@ -811,6 +812,7 @@ public class AsyncContinuation implements AsyncContext, Continuation } /* ------------------------------------------------------------ */ + @Override public ServletResponse getResponse() { if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null) @@ -819,13 +821,15 @@ public class AsyncContinuation implements AsyncContext, Continuation } /* ------------------------------------------------------------ */ + @Override public void start(final Runnable run) { final AsyncEventState event=_event; if (event!=null) { - _channel.getServer().getThreadPool().dispatch(new Runnable() + _channel.execute(new Runnable() { + @Override public void run() { ((Context)event.getServletContext()).getContextHandler().handle(run); @@ -835,6 +839,7 @@ public class AsyncContinuation implements AsyncContext, Continuation } /* ------------------------------------------------------------ */ + @Override public boolean hasOriginalRequestAndResponse() { synchronized (this) @@ -857,6 +862,7 @@ public class AsyncContinuation implements AsyncContext, Continuation /** * @see Continuation#isResumed() */ + @Override public boolean isResumed() { synchronized (this) @@ -868,6 +874,7 @@ public class AsyncContinuation implements AsyncContext, Continuation /** * @see Continuation#isExpired() */ + @Override public boolean isExpired() { synchronized (this) @@ -880,6 +887,7 @@ public class AsyncContinuation implements AsyncContext, Continuation /** * @see Continuation#resume() */ + @Override public void resume() { dispatch(); @@ -889,18 +897,19 @@ public class AsyncContinuation implements AsyncContext, Continuation /** * @see Continuation#suspend() */ + @Override public void suspend(ServletResponse response) { _continuation=true; if (response instanceof ServletResponseWrapper) { _responseWrapped=true; - AsyncContinuation.this.suspend(_channel.getRequest().getServletContext(),_channel.getRequest(),response); + HttpChannelState.this.suspend(_channel.getRequest().getServletContext(),_channel.getRequest(),response); } else { _responseWrapped=false; - AsyncContinuation.this.suspend(_channel.getRequest().getServletContext(),_channel.getRequest(),_channel.getResponse()); + HttpChannelState.this.suspend(_channel.getRequest().getServletContext(),_channel.getRequest(),_channel.getResponse()); } } @@ -908,17 +917,19 @@ public class AsyncContinuation implements AsyncContext, Continuation /** * @see Continuation#suspend() */ + @Override public void suspend() { _responseWrapped=false; _continuation=true; - AsyncContinuation.this.suspend(_channel.getRequest().getServletContext(),_channel.getRequest(),_channel.getResponse()); + HttpChannelState.this.suspend(_channel.getRequest().getServletContext(),_channel.getRequest(),_channel.getResponse()); } /* ------------------------------------------------------------ */ /** * @see org.eclipse.jetty.continuation.Continuation#getServletResponse() */ + @Override public ServletResponse getServletResponse() { if (_responseWrapped && _event!=null && _event.getSuppliedResponse()!=null) @@ -930,6 +941,7 @@ public class AsyncContinuation implements AsyncContext, Continuation /** * @see org.eclipse.jetty.continuation.Continuation#getAttribute(java.lang.String) */ + @Override public Object getAttribute(String name) { return _channel.getRequest().getAttribute(name); @@ -939,6 +951,7 @@ public class AsyncContinuation implements AsyncContext, Continuation /** * @see org.eclipse.jetty.continuation.Continuation#removeAttribute(java.lang.String) */ + @Override public void removeAttribute(String name) { _channel.getRequest().removeAttribute(name); @@ -948,6 +961,7 @@ public class AsyncContinuation implements AsyncContext, Continuation /** * @see org.eclipse.jetty.continuation.Continuation#setAttribute(java.lang.String, java.lang.Object) */ + @Override public void setAttribute(String name, Object attribute) { _channel.getRequest().setAttribute(name,attribute); @@ -957,6 +971,7 @@ public class AsyncContinuation implements AsyncContext, Continuation /** * @see org.eclipse.jetty.continuation.Continuation#undispatch() */ + @Override public void undispatch() { if (isSuspended()) @@ -976,7 +991,7 @@ public class AsyncContinuation implements AsyncContext, Continuation @Override public void run() { - AsyncContinuation.this.expired(); + HttpChannelState.this.expired(); } } @@ -991,7 +1006,7 @@ public class AsyncContinuation implements AsyncContext, Continuation public AsyncEventState(ServletContext context, ServletRequest request, ServletResponse response) { - super(AsyncContinuation.this, request,response); + super(HttpChannelState.this, request,response); _suspendedContext=context; } @@ -1015,4 +1030,13 @@ public class AsyncContinuation implements AsyncContext, Continuation return _path; } } + + private final Runnable _handleRequest = new Runnable() + { + @Override + public void run() + { + _channel.handleRequest(); + } + }; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 3c1027dea46..c2d087b5240 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -527,8 +527,13 @@ public class HttpConnection extends AbstractAsyncConnection @Override public Timer getTimer() { - // TODO Auto-generated method stub - return null; + return _connector.getTimer(); + } + + @Override + protected void execute(Runnable task) + { + _connector.findExecutor().execute(task); } private FutureCallback write(ByteBuffer b0,ByteBuffer b1,ByteBuffer b2) diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnector.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnector.java index 7fc5cf8e5d2..d59595065c7 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnector.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnector.java @@ -2,6 +2,7 @@ package org.eclipse.jetty.server; import java.io.IOException; import java.net.InetSocketAddress; +import java.util.Timer; import javax.servlet.ServletRequest; @@ -18,6 +19,7 @@ public abstract class HttpConnector extends AbstractConnector private int _confidentialPort = 0; private boolean _forwarded; private String _hostHeader; + private Timer _timer = new Timer(true); private String _forwardedHostHeader = HttpHeader.X_FORWARDED_HOST.toString(); private String _forwardedServerHeader = HttpHeader.X_FORWARDED_SERVER.toString(); @@ -41,7 +43,29 @@ public abstract class HttpConnector extends AbstractConnector { super(acceptors); } + + + @Override + protected void doStart() throws Exception + { + super.doStart(); + _timer=new Timer("Timer-"+getName(),true); + } + @Override + protected void doStop() throws Exception + { + _timer.cancel(); + _timer=null; + super.doStop(); + } + + + public Timer getTimer() + { + return _timer; + } + public int getRequestHeaderSize() { return _requestHeaderSize; diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java index fc4ff74e133..a14209cd4e5 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Request.java @@ -119,7 +119,7 @@ public class Request implements HttpServletRequest private final HttpChannel _channel; private HttpFields _fields; - private final AsyncContinuation _async = new AsyncContinuation(); + private final HttpChannelState _async; private boolean _asyncSupported = true; private volatile Attributes _attributes; @@ -169,8 +169,8 @@ public class Request implements HttpServletRequest public Request(HttpChannel channel) { _channel = channel; + _async=channel.getState(); _fields=_channel.getRequestFields(); - _async.setConnection(channel); } /* ------------------------------------------------------------ */ @@ -311,7 +311,7 @@ public class Request implements HttpServletRequest } /* ------------------------------------------------------------ */ - public AsyncContinuation getAsyncContinuation() + public HttpChannelState getAsyncContinuation() { return _async; } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java index 38f33c34801..37637888e0c 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/Server.java @@ -364,8 +364,8 @@ public class Server extends HandlerWrapper implements Attributes */ public void handleAsync(HttpChannel connection) throws IOException, ServletException { - final AsyncContinuation async = connection.getRequest().getAsyncContinuation(); - final AsyncContinuation.AsyncEventState state = async.getAsyncEventState(); + final HttpChannelState async = connection.getRequest().getAsyncContinuation(); + final HttpChannelState.AsyncEventState state = async.getAsyncEventState(); final Request baseRequest=connection.getRequest(); final String path=state.getPath(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ContextHandlerCollection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ContextHandlerCollection.java index 1926a5e8a91..7d99b1007f1 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ContextHandlerCollection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/ContextHandlerCollection.java @@ -22,7 +22,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.PathMap; -import org.eclipse.jetty.server.AsyncContinuation; +import org.eclipse.jetty.server.HttpChannelState; import org.eclipse.jetty.server.Handler; import org.eclipse.jetty.server.HandlerContainer; import org.eclipse.jetty.server.Request; @@ -179,7 +179,7 @@ public class ContextHandlerCollection extends HandlerCollection if (handlers==null || handlers.length==0) return; - AsyncContinuation async = baseRequest.getAsyncContinuation(); + HttpChannelState async = baseRequest.getAsyncContinuation(); if (async.isAsync()) { ContextHandler context=async.getContextHandler(); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/RequestLogHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/RequestLogHandler.java index 66a6a93887e..f4e1480fdc3 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/RequestLogHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/RequestLogHandler.java @@ -20,7 +20,7 @@ import javax.servlet.ServletException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; -import org.eclipse.jetty.server.AsyncContinuation; +import org.eclipse.jetty.server.HttpChannelState; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.RequestLog; import org.eclipse.jetty.server.Response; @@ -51,7 +51,7 @@ public class RequestLogHandler extends HandlerWrapper public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException { - AsyncContinuation continuation = baseRequest.getAsyncContinuation(); + HttpChannelState continuation = baseRequest.getAsyncContinuation(); if (!continuation.isInitial()) { baseRequest.setDispatchTime(System.currentTimeMillis()); diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java index 1b1cd3afaf9..f8e232b54e2 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/handler/StatisticsHandler.java @@ -23,7 +23,7 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.continuation.Continuation; import org.eclipse.jetty.continuation.ContinuationListener; -import org.eclipse.jetty.server.AsyncContinuation; +import org.eclipse.jetty.server.HttpChannelState; import org.eclipse.jetty.server.Request; import org.eclipse.jetty.server.Response; import org.eclipse.jetty.util.statistic.CounterStatistic; @@ -53,7 +53,7 @@ public class StatisticsHandler extends HandlerWrapper { public void onComplete(Continuation continuation) { - final Request request = ((AsyncContinuation)continuation).getBaseRequest(); + final Request request = ((HttpChannelState)continuation).getBaseRequest(); final long elapsed = System.currentTimeMillis()-request.getTimeStamp(); _requestStats.decrement(); @@ -100,7 +100,7 @@ public class StatisticsHandler extends HandlerWrapper _dispatchedStats.increment(); final long start; - AsyncContinuation continuation = request.getAsyncContinuation(); + HttpChannelState continuation = request.getAsyncContinuation(); if (continuation.isInitial()) { // new request