From 36a156d4cffd703b7ea50f6806b38ceae9388b61 Mon Sep 17 00:00:00 2001 From: Greg Wilkins Date: Mon, 8 Jun 2009 04:41:11 +0000 Subject: [PATCH] javadoc and ContinuationThrowable git-svn-id: svn+ssh://dev.eclipse.org/svnroot/rt/org.eclipse.jetty/jetty/trunk@336 7e9141cc-0065-0410-87d8-b60c137991c4 --- VERSION.txt | 1 + .../jetty/continuation/Continuation.java | 299 +++++++++++++++--- .../continuation/ContinuationFilter.java | 26 +- .../continuation/ContinuationThrowable.java | 41 +++ .../jetty/continuation/FauxContinuation.java | 19 +- .../continuation/Jetty6Continuation.java | 19 ++ .../continuation/Servlet3Continuation.java | 14 + .../jetty/server/AsyncContinuation.java | 126 ++++---- .../eclipse/jetty/server/HttpConnection.java | 7 +- .../eclipse/jetty/servlet/ServletHandler.java | 5 + .../continuation/test/ContinuationBase.java | 28 ++ 11 files changed, 476 insertions(+), 109 deletions(-) create mode 100644 jetty-continuation/src/main/java/org/eclipse/jetty/continuation/ContinuationThrowable.java diff --git a/VERSION.txt b/VERSION.txt index 19bf716cd59..f137ff6e5cd 100644 --- a/VERSION.txt +++ b/VERSION.txt @@ -7,6 +7,7 @@ jetty-7.0.0.M3-SNAPSHOT + 277798 Denial of Service Filter + Portable continuations for jetty6 and servlet3 + Refactored continuations to only support response wrapping + + Added ContinuationThrowable jetty-7.0.0.M2 18 May 2009 + JETTY-937 Work around Sun JVM bugs diff --git a/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/Continuation.java b/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/Continuation.java index 15b8c42dfd6..5111598a588 100644 --- a/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/Continuation.java +++ b/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/Continuation.java @@ -18,6 +18,7 @@ import javax.servlet.FilterChain; import javax.servlet.Servlet; import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; +import javax.servlet.ServletResponseWrapper; /* ------------------------------------------------------------ */ /** @@ -26,28 +27,131 @@ import javax.servlet.ServletResponse; * A continuation is a mechanism by which a HTTP Request can be suspended and * restarted after a timeout or an asynchronous event has occurred. *

- * Continuations will use the asynchronous APIs if they used by a - * webapp deployed in Jetty or a Servlet 3.0 container. For other - * containers, the {@link ContinuationFilter} may be used to - * simulate asynchronous features. + * The continuation mechanism is a portable mechansim that will work + * asychronously without additional configuration of all jetty-7, + * jetty-8 and Servlet 3.0 containers. With the addition of + * the {@link ContinuationFilter}, the mechansism will also work + * asynchronously on jetty-6 and non-asynchronously on any + * servlet 2.5 container. + *

+ * The Continuation API is a simplification of the richer async API + * provided by the servlet-3.0 and an enhancement of the continuation + * API that was introduced with jetty-6. + *

+ *

Continuation Usage

+ *

+ * A continuation object is obtained for a request by calling the + * factory method {@link ContinuationSupport#getContinuation(ServletRequest)}. + * The continuation type returned will depend on the servlet container + * being used. + *

+ *

+ * There are two distinct style of operation of the continuation API. + *

+ *

Suspend/Resume Usage

+ *

The suspend/resume style is used when a servlet and/or + * filter is used to generate the response after a asynchronous wait that is + * terminated by an asynchronous handler. + *

+ *
+ * Filter/Servlet:
+ *   // if we need to get asynchronous results
+ *   Object results = request.getAttribute("results);
+ *   if (results==null)
+ *   {
+ *     Continuation continuation = ContinuationSupport.getContinuation(request);
+ *     continuation.suspend();
+ *     myAsyncHandler.register(continuation);
+ *     return; // or continuation.undispatch();
+ *   }
+ * 
+ * async wait ...
+ * 
+ * Async Handler:
+ *   // when the waited for event happens
+ *   continuation.setAttribute("results",event);
+ *   continuation.resume();
+ *   
+ * Filter/Servlet:
+ *   // when the request is redispatched 
+ *   if (results==null)
+ *   {
+ *     ... // see above
+ *   }
+ *   else
+ *   {
+ *     response.getOutputStream().write(process(results));
+ *   }
+ * 
+ *

Suspend/Complete Usage

+ *

+ * The suspend/complete style is used when an asynchronous handler is used to + * generate the response: + *

+ *
+ * Filter/Servlet:
+ *   // when we want to enter asynchronous mode
+ *   Continuation continuation = ContinuationSupport.getContinuation(request);
+ *   continuation.suspend(response); // response may be wrapped
+ *   myAsyncHandler.register(continuation);
+ *   return; // or continuation.undispatch();
+ *
+ * Wrapping Filter:
+ *   // any filter that had wrapped the response should be implemented like:
+ *   try
+ *   {
+ *     chain.doFilter(request,wrappedResponse);
+ *   }
+ *   finally
+ *   {
+ *     if (!continuation.isResponseWrapped())
+ *       wrappedResponse.finish()
+ *     else
+ *       continuation.addContinuationListener(myCompleteListener)
+ *   }
+ *
+ * async wait ...
+ *
+ * Async Handler:
+ *   // when the async event happens
+ *   continuation.getServletResponse().getOutputStream().write(process(event));
+ *   continuation.complete()
+ * 
+ * + *

Continuation Timeout

+ *

+ * If a continuation is suspended, but neither {@link #complete()} or {@link #resume()} is + * called during the period set by {@link #setTimeout(long)}, then the continuation will + * expire and {@link #isExpired()} will return true. + *

+ *

+ * When a continuation expires, the {@link ContinuationListener#onTimeout(Continuation)} + * method is called on any {@link ContinuationListener} that has been registered via the + * {@link #addContinuationListener(ContinuationListener)} method. The onTimeout handlers + * may write a response and call {@link #complete()}. If {@link #complete()} is not called, + * then the container will redispatch the request as if {@link #resume()} had been called, + * except that {@link #isExpired()} will be true and {@link #isResumed()} will be false. *

* * @see ContinuationSupport + * @see ContinuationListener * */ public interface Continuation { public final static String ATTRIBUTE = "org.eclipse.jetty.continuation"; + /* ------------------------------------------------------------ */ /** - * Set the continuation timeout + * Set the continuation timeout. * * @param timeoutMs * The time in milliseconds to wait before expiring this - * continuation. + * continuation after a call to {@link #suspend()} or {@link #suspend(ServletResponse)} */ void setTimeout(long timeoutMs); + /* ------------------------------------------------------------ */ /** * Suspend the processing of the request and associated * {@link ServletResponse}. @@ -57,81 +161,105 @@ public interface Continuation * extended beyond the return to the container from the * {@link Servlet#service(ServletRequest, ServletResponse)} method and * {@link Filter#doFilter(ServletRequest, ServletResponse, FilterChain)} - * calls. If a request is suspended, then the container will not commit the - * associated response when the call to the filter chain and/or servlet - * service method returns to the container. Any exceptions thrown to the - * container by a filter chain and/or servlet for a suspended requests are - * silently ignored. + * calls. When a suspended request is returned to the container after + * a dispatch, then the container will not commit the associated response + * (unless an exception other than {@link ContinuationThrowable} is thrown). *

* *

* When the thread calling the filter chain and/or servlet has returned to * the container with a suspended request, the thread is freed for other - * tasks and the request is held pending either: + * tasks and the request is held until either: *

*

- * After any of the events listed above, the suspended request will be - * redispatched via the filter and servlet processing. + * Typically suspend with no arguments is uses when a call to {@link #resume()} + * is expected. If a call to {@link #complete()} is expected, then the + * {@link #suspend(ServletResponse)} method should be used instead of this method. *

* - *

- * Suspend may only be called by a thread that is within the service calling - * stack of - * {@link Filter#doFilter(ServletRequest, ServletResponse, FilterChain)} - * and/or {@link Servlet#service(ServletRequest, ServletResponse)}. A - * request that has been dispatched for error handling may not be suspended. - *

- * - * @see {@link #resume()} - * @see {@link #complete()} - * * @exception IllegalStateException - * If the calling thread is not within the calling stack of - * {@link Filter#doFilter(ServletRequest, ServletResponse, FilterChain)} - * and/or - * {@link Servlet#service(ServletRequest, ServletResponse)} - * or if the request has been dispatched for error handling. + * If the request cannot be suspended */ void suspend(); + + /* ------------------------------------------------------------ */ + /** + * Suspend the processing of the request and associated + * {@link ServletResponse}. + * + *

+ * After this method has been called, the lifecycle of the request will be + * extended beyond the return to the container from the + * {@link Servlet#service(ServletRequest, ServletResponse)} method and + * {@link Filter#doFilter(ServletRequest, ServletResponse, FilterChain)} + * calls. When a suspended request is returned to the container after + * a dispatch, then the container will not commit the associated response + * (unless an exception other than {@link ContinuationThrowable} is thrown). + *

+ *

+ * When the thread calling the filter chain and/or servlet has returned to + * the container with a suspended request, the thread is freed for other + * tasks and the request is held until either: + *

+ *

+ * Typically suspend with a response argument is uses when a call to {@link #complete()} + * is expected. If a call to {@link #resume()} is expected, then the + * {@link #suspend()} method should be used instead of this method. + *

+ *

+ * Filters that may wrap the response object should check {@link #isResponseWrapped()} + * to decide if they should destroy/finish the wrapper. If {@link #isResponseWrapped()} + * returns true, then the wrapped request has been passed to the asynchronous + * handler and the wrapper should not be destroyed/finished until after a call to + * {@link #complete()} (potentially using a {@link ContinuationListener#onComplete(Continuation)} + * listener). + * + * @param response The response to return via a call to {@link #getServletResponse()} + * @exception IllegalStateException + * If the request cannot be suspended + */ void suspend(ServletResponse response); + /* ------------------------------------------------------------ */ /** * Resume a suspended request. * *

* This method can be called by any thread that has been passed a reference - * to a suspended request. When called the request is redispatched to the - * normal filter chain and servlet processing. + * to a continuation. When called the request is redispatched to the + * normal filter chain and servlet processing with {@link #isInitial()} false. *

- * *

* If resume is called before a suspended request is returned to the * container (ie the thread that called {@link #suspend(long)} is still * within the filter chain and/or servlet service method), then the resume * does not take effect until the call to the filter chain and/or servlet * returns to the container. In this case both {@link #isSuspended()} and - * {@link isResumed()} return true. + * {@link isResumed()} return true. Multiple calls to resume are ignored. *

- * *

- * Multiple calls to resume are ignored + * Typically resume() is used after a call to {@link #suspend()} with + * no arguments. The dispatch after a resume call will use the original + * request and response objects, even if {@link #suspend(ServletResponse)} + * had been passed a wrapped response. *

* * @see {@link #suspend()} - * @exception IllegalStateException - * if the request is not suspended. + * @exception IllegalStateException if the request is not suspended. * */ void resume(); + /* ------------------------------------------------------------ */ /** * Complete a suspended request. * @@ -151,11 +279,20 @@ public interface Continuation *

* *

+ * Typically resume() is used after a call to {@link #suspend(ServletResponse)} with + * a possibly wrapped response. The async handler should use the response + * provided by {@link #getServletResponse()} to write the response before + * calling {@link #complete()}. If the request was suspended with a + * call to {@link #suspend()} then no response object will be available via + * {@link #getServletResponse()}. + *

+ * + *

* Once complete has been called and any thread calling the filter chain * and/or servlet chain has returned to the container, the request lifecycle * is complete. The container is able to recycle request objects, so it is - * not valid hold a request reference after the end of the life cycle or to - * call any request methods. + * not valid hold a request or continuation reference after the end of the + * life cycle. * * @see {@link #suspend()} * @exception IllegalStateException @@ -164,6 +301,7 @@ public interface Continuation */ void complete(); + /* ------------------------------------------------------------ */ /** * @return true after {@link #suspend(long)} has been called and before the * request has been redispatched due to being resumed, completed or @@ -171,6 +309,7 @@ public interface Continuation */ boolean isSuspended(); + /* ------------------------------------------------------------ */ /** * @return true if the request has been redispatched by a call to * {@link #resume()}. Returns false after any subsequent call to @@ -178,12 +317,14 @@ public interface Continuation */ boolean isResumed(); + /* ------------------------------------------------------------ */ /** * @return true after a request has been redispatched as the result of a * timeout. Returns false after any subsequent call to suspend. */ boolean isExpired(); + /* ------------------------------------------------------------ */ /** * @return true while the request is within the initial dispatch to the * filter chain and/or servlet. Will return false once the calling @@ -191,21 +332,83 @@ public interface Continuation * called and during any subsequent redispatch. */ boolean isInitial(); - + + /* ------------------------------------------------------------ */ /** - * @return True if {@link #keepWrappers()} has been called. + * Is the suspended response wrapped. + *

+ * Filters that wrap the response object should check this method to + * determine if they should destroy/finish the wrapped response. If + * the request was suspended with a call to {@link #suspend(ServletResponse)} + * that passed the wrapped response, then the filter should register + * a {@link ContinuationListener} to destroy/finish the wrapped response + * during a call to {@link ContinuationListener#onComplete(Continuation)}. + * @return True if {@link #suspend(ServletResponse)} has been passed a + * {@link ServletResponseWrapper} instance. */ boolean isResponseWrapped(); + + /* ------------------------------------------------------------ */ + /** + * Get the suspended response. + * @return the {@link ServletResponse} passed to {@link #suspend(ServletResponse)}. + */ ServletResponse getServletResponse(); /* ------------------------------------------------------------ */ - /** + /** + * Add a ContinuationListener. + * * @param listener */ void addContinuationListener(ContinuationListener listener); - public void removeAttribute(String name); + /* ------------------------------------------------------------ */ + /** Set a request attribute. + * This method is a convenience method to call the {@link ServletRequest#setAttribute(String, Object)} + * method on the associated request object. + * This is a thread safe call and may be called by any thread. + * @param name the attribute name + * @param attribute the attribute value + */ public void setAttribute(String name, Object attribute); + + /* ------------------------------------------------------------ */ + /** Get a request attribute. + * This method is a convenience method to call the {@link ServletRequest#getAttribute(String)} + * method on the associated request object. + * This is a thread safe call and may be called by any thread. + * @param name the attribute name + * @return the attribute value + */ public Object getAttribute(String name); + + /* ------------------------------------------------------------ */ + /** Remove a request attribute. + * This method is a convenience method to call the {@link ServletRequest#removeAttribute(String)} + * method on the associated request object. + * This is a thread safe call and may be called by any thread. + * @param name the attribute name + */ + public void removeAttribute(String name); + + /* ------------------------------------------------------------ */ + /** + * Undispatch the request. + *

+ * This method can be called on a suspended continuation in order + * to exit the dispatch to the filter/servlet by throwing a {@link ContinuationThrowable} + * which is caught either by the container or the {@link ContinuationFilter}. + * This is an alternative to simply returning from the dispatch in the case + * where filters in the filter chain may not be prepared to handle a suspended + * request. + *

+ * This method should only be used as a last resort and a normal return is a prefereable + * solution if filters can be updated to handle that case. + * + * @throws ContinuationThrowable thrown if the request is suspended. The instance of the + * exception may be reused on subsequent calls, so the stack frame may not be accurate. + */ + public void undispatch() throws ContinuationThrowable; } diff --git a/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/ContinuationFilter.java b/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/ContinuationFilter.java index f2936cd160f..0502766441b 100644 --- a/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/ContinuationFilter.java +++ b/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/ContinuationFilter.java @@ -64,7 +64,7 @@ public class ContinuationFilter implements Filter { if (_faux) { - final FauxContinuation fc = new FauxContinuation(request); + final FauxContinuation fc = new FauxContinuation(request,_debug); request.setAttribute(Continuation.ATTRIBUTE,fc); boolean complete=false; @@ -74,6 +74,10 @@ public class ContinuationFilter implements Filter { chain.doFilter(request,response); } + catch (ContinuationThrowable e) + { + debug("faux",e); + } finally { complete=fc.handleSuspension(); @@ -99,13 +103,27 @@ public class ContinuationFilter implements Filter } } else - chain.doFilter(request,response); + { + try + { + chain.doFilter(request,response); + } + catch (ContinuationThrowable e) + { + debug("caught",e); + } + } } - private void debug(String string, Exception e) + private void debug(String string, Throwable th) { if (_debug) - _context.log("DEBUG",e); + { + if (th instanceof ContinuationThrowable) + _context.log("DEBUG: "+th); + else + _context.log("DEBUG",th); + } } public void destroy() diff --git a/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/ContinuationThrowable.java b/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/ContinuationThrowable.java new file mode 100644 index 00000000000..02393fd8d0d --- /dev/null +++ b/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/ContinuationThrowable.java @@ -0,0 +1,41 @@ +// ======================================================================== +// Copyright (c) 2009-2009 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== + + +package org.eclipse.jetty.continuation; + + +/* ------------------------------------------------------------ */ +/** ContinuationThrowable + *

+ * A ContinuationThrowable is throw by {@link Continuation#undispatch()} + * in order to exit the dispatch to a Filter or Servlet. Use of + * ContinuationThrowable is discouraged and it is preferable to + * allow return to be used. ContinuationThrowables should only be + * used when there is a Filter/Servlet which cannot be modified + * to avoid committing a response when {@link Continuation#isSuspended()} + * is true. + *

+ *

+ * ContinuationThrowable instances are often reused so that the + * stack trace may be entirely unrelated to the calling stack. + * A real stack trace may be obtained by enabling debug. + *

+ *

+ * ContinuationThrowable extends Error as this is more likely + * to be uncaught (or rethrown) by a Filter/Servlet. A ContinuationThrowable + * does not represent and error condition. + *

+ */ +public class ContinuationThrowable extends Error +{} \ No newline at end of file diff --git a/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/FauxContinuation.java b/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/FauxContinuation.java index 9bae74744e8..956149af92c 100644 --- a/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/FauxContinuation.java +++ b/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/FauxContinuation.java @@ -22,6 +22,8 @@ import javax.servlet.ServletResponseWrapper; class FauxContinuation implements Continuation { + private final static ContinuationThrowable __exception = new ContinuationThrowable(); + private static final int __HANDLING=1; // Request dispatched to filter/servlet private static final int __SUSPENDING=2; // Suspend called, but not yet returned to container private static final int __RESUMING=3; // resumed while suspending @@ -39,12 +41,14 @@ class FauxContinuation implements Continuation private boolean _timeout=false; private boolean _responseWrapped=false; private long _timeoutMs=30000; // TODO configure + private boolean _debug; private ArrayList _listeners; - FauxContinuation(final ServletRequest request) + FauxContinuation(final ServletRequest request, boolean debug) { _request=request; + _debug=debug; } /* ------------------------------------------------------------ */ @@ -456,6 +460,15 @@ class FauxContinuation implements Continuation { _request.setAttribute(name,attribute); } - - + + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.continuation.Continuation#undispatch() + */ + public void undispatch() + { + if (isSuspended()) + throw _debug?new ContinuationThrowable():__exception; + throw new IllegalStateException("!suspended"); + } } \ No newline at end of file diff --git a/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/Jetty6Continuation.java b/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/Jetty6Continuation.java index 1a276b6c89b..08afbd8dda4 100644 --- a/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/Jetty6Continuation.java +++ b/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/Jetty6Continuation.java @@ -159,6 +159,25 @@ public class Jetty6Continuation implements ContinuationFilter.PartialContinuatio return _responseWrapped; } + + + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.continuation.Continuation#undispatch() + */ + public void undispatch() + { + Throwable th=_retry; + if (th instanceof ThreadDeath) + throw (ThreadDeath)th; + if (th instanceof Error) + throw (Error)th; + if (th instanceof RuntimeException) + throw (RuntimeException)th; + + throw new IllegalStateException("!suspended"); + } + public boolean enter() { _expired=!_j6Continuation.isResumed(); diff --git a/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/Servlet3Continuation.java b/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/Servlet3Continuation.java index b0188329a96..e5fa239e09e 100644 --- a/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/Servlet3Continuation.java +++ b/jetty-continuation/src/main/java/org/eclipse/jetty/continuation/Servlet3Continuation.java @@ -13,6 +13,8 @@ import javax.servlet.ServletResponseWrapper; public class Servlet3Continuation implements Continuation { + private final static ContinuationThrowable __exception = new ContinuationThrowable(); + private final ServletRequest _request; private ServletResponse _response; private AsyncContext _context; @@ -160,4 +162,16 @@ public class Servlet3Continuation implements Continuation { _request.setAttribute(name,attribute); } + + + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.continuation.Continuation#undispatch() + */ + public void undispatch() + { + if (isSuspended()) + throw __exception; + throw new IllegalStateException("!suspended"); + } } diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContinuation.java b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContinuation.java index a181adc0ba6..1b484799201 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContinuation.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/AsyncContinuation.java @@ -18,6 +18,7 @@ import javax.servlet.ServletRequest; import javax.servlet.ServletResponse; import javax.servlet.ServletResponseWrapper; +import org.eclipse.jetty.continuation.ContinuationThrowable; import org.eclipse.jetty.continuation.ContinuationListener; import org.eclipse.jetty.continuation.Continuation; import org.eclipse.jetty.io.AsyncEndPoint; @@ -34,28 +35,31 @@ import org.eclipse.jetty.util.thread.Timeout; */ public class AsyncContinuation implements AsyncContext, Continuation { + private final static ContinuationThrowable __exception = new ContinuationThrowable(); + // STATES: + // handling() suspend() unhandle() resume() complete() doComplete() + // startAsync() dispatch() + // IDLE DISPATCHED + // DISPATCHED ASYNCSTARTED UNCOMPLETED + // ASYNCSTARTED ASYNCWAIT REDISPATCHING COMPLETING + // REDISPATCHING REDISPATCHED + // ASYNCWAIT REDISPATCH COMPLETING + // REDISPATCH REDISPATCHED + // REDISPATCHED ASYNCSTARTED UNCOMPLETED + // COMPLETING UNCOMPLETED UNCOMPLETED + // UNCOMPLETED COMPLETED + // COMPLETED private static final int __IDLE=0; // Idle request private static final int __DISPATCHED=1; // Request dispatched to filter/servlet - private static final int __SUSPENDING=2; // Suspend called, but not yet returned to container + private static final int __ASYNCSTARTED=2; // Suspend called, but not yet returned to container private static final int __REDISPATCHING=3;// resumed while dispatched - private static final int __SUSPENDED=4; // Suspended and parked - private static final int __UNSUSPENDING=5; // Has been scheduled + private static final int __ASYNCWAIT=4; // Suspended and parked + private static final int __REDISPATCH=5; // Has been scheduled private static final int __REDISPATCHED=6; // Request redispatched to filter/servlet private static final int __COMPLETING=7; // complete while dispatched private static final int __UNCOMPLETED=8; // Request is completable - private static final int __COMPLETE=9; // Request is complete - - // State table - // __HANDLE __UNHANDLE __SUSPEND __REDISPATCH - // IDLE */ { __DISPATCHED, __Illegal, __Illegal, __Illegal }, - // DISPATCHED */ { __Illegal, __UNCOMPLETED, __SUSPENDING, __Ignore }, - // SUSPENDING */ { __Illegal, __SUSPENDED, __Illegal,__REDISPATCHING }, - // REDISPATCHING */ { __Illegal, _REDISPATCHED, __Ignored, __Ignore }, - // COMPLETING */ { __Illegal, __UNCOMPLETED, __Illegal, __Illegal }, - // SUSPENDED */ { __REDISPATCHED, __Illegal, __Illegal, __UNSUSPENDING }, - // UNSUSPENDING */ { __REDISPATCHED, __Illegal, __Illegal, __Ignore }, - // REDISPATCHED */ { __Illegal, __UNCOMPLETED, __SUSPENDING, __Ignore }, + private static final int __COMPLETED=9; // Request is complete /* ------------------------------------------------------------ */ @@ -169,10 +173,10 @@ public class AsyncContinuation implements AsyncContext, Continuation { switch(_state) { - case __SUSPENDING: + case __ASYNCSTARTED: case __REDISPATCHING: case __COMPLETING: - case __SUSPENDED: + case __ASYNCWAIT: return true; default: @@ -198,14 +202,14 @@ public class AsyncContinuation implements AsyncContext, Continuation return ((_state==__IDLE)?"IDLE": (_state==__DISPATCHED)?"DISPATCHED": - (_state==__SUSPENDING)?"SUSPENDING": - (_state==__SUSPENDED)?"SUSPENDED": + (_state==__ASYNCSTARTED)?"ASYNCSTARTED": + (_state==__ASYNCWAIT)?"ASYNCWAIT": (_state==__REDISPATCHING)?"REDISPATCHING": - (_state==__UNSUSPENDING)?"UNSUSPENDING": + (_state==__REDISPATCH)?"REDISPATCH": (_state==__REDISPATCHED)?"REDISPATCHED": (_state==__COMPLETING)?"COMPLETING": (_state==__UNCOMPLETED)?"UNCOMPLETED": - (_state==__COMPLETE)?"COMPLETE": + (_state==__COMPLETED)?"COMPLETE": ("UNKNOWN?"+_state))+ (_initial?",initial":"")+ (_resumed?",resumed":"")+ @@ -230,7 +234,7 @@ public class AsyncContinuation implements AsyncContext, Continuation { case __DISPATCHED: case __REDISPATCHED: - case __COMPLETE: + case __COMPLETED: throw new IllegalStateException(this.getStatusString()); case __IDLE: @@ -238,7 +242,7 @@ public class AsyncContinuation implements AsyncContext, Continuation _state=__DISPATCHED; return true; - case __SUSPENDING: + case __ASYNCSTARTED: case __REDISPATCHING: throw new IllegalStateException(this.getStatusString()); @@ -246,10 +250,10 @@ public class AsyncContinuation implements AsyncContext, Continuation _state=__UNCOMPLETED; return false; - case __SUSPENDED: + case __ASYNCWAIT: return false; - case __UNSUSPENDING: + case __REDISPATCH: _state=__REDISPATCHED; return true; @@ -286,20 +290,20 @@ public class AsyncContinuation implements AsyncContext, Continuation { case __DISPATCHED: case __REDISPATCHED: - _state=__SUSPENDING; + _state=__ASYNCSTARTED; return; case __IDLE: throw new IllegalStateException(this.getStatusString()); - case __SUSPENDING: + case __ASYNCSTARTED: case __REDISPATCHING: return; case __COMPLETING: - case __SUSPENDED: - case __UNSUSPENDING: - case __COMPLETE: + case __ASYNCWAIT: + case __REDISPATCH: + case __COMPLETED: throw new IllegalStateException(this.getStatusString()); default: @@ -331,11 +335,11 @@ public class AsyncContinuation implements AsyncContext, Continuation case __IDLE: throw new IllegalStateException(this.getStatusString()); - case __SUSPENDING: + case __ASYNCSTARTED: _initial=false; - _state=__SUSPENDED; + _state=__ASYNCWAIT; scheduleTimeout(); // could block and change state. - if (_state==__SUSPENDED) + if (_state==__ASYNCWAIT) return true; else if (_state==__COMPLETING) { @@ -356,8 +360,8 @@ public class AsyncContinuation implements AsyncContext, Continuation _state=__UNCOMPLETED; return true; - case __SUSPENDED: - case __UNSUSPENDING: + case __ASYNCWAIT: + case __REDISPATCH: default: throw new IllegalStateException(this.getStatusString()); } @@ -378,22 +382,22 @@ public class AsyncContinuation implements AsyncContext, Continuation case __IDLE: case __REDISPATCHING: case __COMPLETING: - case __COMPLETE: + case __COMPLETED: case __UNCOMPLETED: return; - case __SUSPENDING: + case __ASYNCSTARTED: _state=__REDISPATCHING; _resumed=true; return; - case __SUSPENDED: + case __ASYNCWAIT: dispatch=!_expired; - _state=__UNSUSPENDING; + _state=__REDISPATCH; _resumed=true; break; - case __UNSUSPENDING: + case __REDISPATCH: return; default: @@ -417,8 +421,8 @@ public class AsyncContinuation implements AsyncContext, Continuation // _history.append('E'); switch(_state) { - case __SUSPENDING: - case __SUSPENDED: + case __ASYNCSTARTED: + case __ASYNCWAIT: listeners=_listeners; break; default: @@ -453,8 +457,8 @@ public class AsyncContinuation implements AsyncContext, Continuation // _history.append('e'); switch(_state) { - case __SUSPENDING: - case __SUSPENDED: + case __ASYNCSTARTED: + case __ASYNCWAIT: dispatch(); } } @@ -476,21 +480,21 @@ public class AsyncContinuation implements AsyncContext, Continuation switch(_state) { case __IDLE: - case __COMPLETE: + case __COMPLETED: case __REDISPATCHING: case __COMPLETING: - case __UNSUSPENDING: + case __REDISPATCH: return; case __DISPATCHED: case __REDISPATCHED: throw new IllegalStateException(this.getStatusString()); - case __SUSPENDING: + case __ASYNCSTARTED: _state=__COMPLETING; return; - case __SUSPENDED: + case __ASYNCWAIT: _state=__COMPLETING; dispatch=!_expired; break; @@ -521,7 +525,7 @@ public class AsyncContinuation implements AsyncContext, Continuation switch(_state) { case __UNCOMPLETED: - _state=__COMPLETE; + _state=__COMPLETED; listeners=_listeners; break; @@ -670,7 +674,7 @@ public class AsyncContinuation implements AsyncContext, Continuation { synchronized (this) { - return _state==__COMPLETE; + return _state==__COMPLETED; } } @@ -682,10 +686,10 @@ public class AsyncContinuation implements AsyncContext, Continuation { switch(_state) { - case __SUSPENDING: + case __ASYNCSTARTED: case __REDISPATCHING: - case __UNSUSPENDING: - case __SUSPENDED: + case __REDISPATCH: + case __ASYNCWAIT: return true; default: @@ -878,6 +882,22 @@ public class AsyncContinuation implements AsyncContext, Continuation _connection.getRequest().setAttribute(name,attribute); } + /* ------------------------------------------------------------ */ + /** + * @see org.eclipse.jetty.continuation.Continuation#undispatch() + */ + public void undispatch() + { + if (isSuspended()) + { + if (Log.isDebugEnabled()) + throw new ContinuationThrowable(); + else + throw __exception; + } + throw new IllegalStateException("!suspended"); + } + /* ------------------------------------------------------------ */ /* ------------------------------------------------------------ */ public class AsyncEventState diff --git a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java index 8bbac4ebcaf..076eca51e1a 100644 --- a/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java +++ b/jetty-server/src/main/java/org/eclipse/jetty/server/HttpConnection.java @@ -21,6 +21,7 @@ import javax.servlet.ServletInputStream; import javax.servlet.ServletOutputStream; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.continuation.ContinuationThrowable; import org.eclipse.jetty.http.AbstractGenerator; import org.eclipse.jetty.http.EncodedHttpURI; import org.eclipse.jetty.http.Generator; @@ -547,9 +548,13 @@ public class HttpConnection implements Connection server.handleAsync(this); } } + catch (ContinuationThrowable e) + { + Log.debug(e); + } catch (EofException e) { - Log.ignore(e); + Log.debug(e); error=true; } catch (HttpException e) diff --git a/jetty-servlet/src/main/java/org/eclipse/jetty/servlet/ServletHandler.java b/jetty-servlet/src/main/java/org/eclipse/jetty/servlet/ServletHandler.java index a726cbfb468..9e88a275a52 100644 --- a/jetty-servlet/src/main/java/org/eclipse/jetty/servlet/ServletHandler.java +++ b/jetty-servlet/src/main/java/org/eclipse/jetty/servlet/ServletHandler.java @@ -34,6 +34,7 @@ import javax.servlet.UnavailableException; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; +import org.eclipse.jetty.continuation.ContinuationThrowable; import org.eclipse.jetty.http.PathMap; import org.eclipse.jetty.io.EofException; import org.eclipse.jetty.io.HttpException; @@ -493,6 +494,10 @@ public class ServletHandler extends ScopedHandler else if(Log.isDebugEnabled())Log.debug("Response already committed for handling "+th); } + catch(ContinuationThrowable e) + { + throw e; + } catch(Error e) { if (!(DispatcherType.REQUEST.equals(type) || DispatcherType.ASYNC.equals(type))) diff --git a/test-continuation/src/main/java/org/eclipse/jetty/continuation/test/ContinuationBase.java b/test-continuation/src/main/java/org/eclipse/jetty/continuation/test/ContinuationBase.java index d2ae6a96ff9..5089c0dbb59 100644 --- a/test-continuation/src/main/java/org/eclipse/jetty/continuation/test/ContinuationBase.java +++ b/test-continuation/src/main/java/org/eclipse/jetty/continuation/test/ContinuationBase.java @@ -136,6 +136,26 @@ public abstract class ContinuationBase extends TestCase assertEquals(1,count(response,"history: onComplete")); assertContains("TIMEOUT",response); + + response=process("suspend=200&resume=10&undispatch=true",null); + assertContains("RESUMED",response); + assertNotContains("history: onTimeout",response); + assertContains("history: onComplete",response); + + response=process("suspend=200&resume=0&undispatch=true",null); + assertContains("RESUMED",response); + assertNotContains("history: onTimeout",response); + assertContains("history: onComplete",response); + + response=process("suspend=200&complete=10&undispatch=true",null); + assertContains("COMPLETED",response); + assertNotContains("history: onTimeout",response); + assertContains("history: onComplete",response); + + response=process("suspend=200&complete=0&undispatch=true",null); + assertContains("COMPLETED",response); + assertNotContains("history: onTimeout",response); + assertContains("history: onComplete",response); } @@ -218,6 +238,7 @@ public abstract class ContinuationBase extends TestCase long resume2_after=-1; long complete_after=-1; long complete2_after=-1; + boolean undispatch=false; if (request.getParameter("read")!=null) read_before=Integer.parseInt(request.getParameter("read")); @@ -235,6 +256,8 @@ public abstract class ContinuationBase extends TestCase complete_after=Integer.parseInt(request.getParameter("complete")); if (request.getParameter("complete2")!=null) complete2_after=Integer.parseInt(request.getParameter("complete2")); + if (request.getParameter("undispatch")!=null) + undispatch=Boolean.parseBoolean(request.getParameter("undispatch")); if (continuation.isInitial()) { @@ -308,6 +331,9 @@ public abstract class ContinuationBase extends TestCase ((HttpServletResponse)continuation.getServletResponse()).addHeader("history","resume"); continuation.resume(); } + + if (undispatch) + continuation.undispatch(); } else if (sleep_for>=0) { @@ -387,6 +413,8 @@ public abstract class ContinuationBase extends TestCase ((HttpServletResponse)response).addHeader("history","resume"); continuation.resume(); } + if (undispatch) + continuation.undispatch(); return; } else if (continuation.isExpired())