474634 - AsyncListener.onError() handling.

Interim work on getting the right behavior for onError().
This commit is contained in:
Simone Bordet 2015-08-13 01:33:28 +02:00
parent 4c7d5f661e
commit 79086f3fe3
5 changed files with 638 additions and 408 deletions

View File

@ -77,7 +77,7 @@ public class AsyncContextEvent extends AsyncEvent implements Runnable
{
return _context;
}
public Context getContext()
{
return _context;
@ -100,12 +100,12 @@ public class AsyncContextEvent extends AsyncEvent implements Runnable
{
return _dispatchPath;
}
public void setTimeoutTask(Scheduler.Task task)
{
_timeoutTask = task;
}
public void cancelTimeoutTask()
{
Scheduler.Task task=_timeoutTask;
@ -119,28 +119,28 @@ public class AsyncContextEvent extends AsyncEvent implements Runnable
{
return _asyncContext;
}
@Override
public Throwable getThrowable()
{
return _throwable;
}
public void setThrowable(Throwable throwable)
{
_throwable=throwable;
}
// public void setThrowable(Throwable throwable)
// {
// _throwable=throwable;
// }
public void setDispatchContext(ServletContext context)
{
_dispatchContext=context;
}
public void setDispatchPath(String path)
{
_dispatchPath=path;
}
public void completed()
{
_timeoutTask=null;
@ -158,7 +158,15 @@ public class AsyncContextEvent extends AsyncEvent implements Runnable
Scheduler.Task task=_timeoutTask;
_timeoutTask=null;
if (task!=null)
_state.expired();
_state.onTimeout();
}
public void addThrowable(Throwable e)
{
if (_throwable==null)
_throwable=e;
else
_throwable.addSuppressed(e);
}
}

View File

@ -116,7 +116,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
{
return _written;
}
/**
* @return the number of requests handled by this connection
*/
@ -232,8 +232,8 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
* If the associated response has the Expect header set to 100 Continue,
* then accessing the input stream indicates that the handler/servlet
* is ready for the request body and thus a 100 Continue response is sent.
*
* @param available estimate of the number of bytes that are available
*
* @param available estimate of the number of bytes that are available
* @throws IOException if the InputStream cannot be created
*/
public void continue100(int available) throws IOException
@ -270,179 +270,201 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
LOG.debug("{} handle {} ", this,_request.getHttpURI());
HttpChannelState.Action action = _state.handling();
try
// Loop here to handle async request redispatches.
// The loop is controlled by the call to async.unhandle in the
// finally block below. Unhandle will return false only if an async dispatch has
// already happened when unhandle is called.
while (!getServer().isStopped())
{
// Loop here to handle async request redispatches.
// The loop is controlled by the call to async.unhandle in the
// finally block below. Unhandle will return false only if an async dispatch has
// already happened when unhandle is called.
loop: while (action.ordinal()<HttpChannelState.Action.WAIT.ordinal() && !getServer().isStopped())
// Early exit out of the loop if the action
// is a terminal one to skip unhandle().
if (action == Action.TERMINATED || action == Action.WAIT)
break;
boolean error=false;
try
{
boolean error=false;
try
if (LOG.isDebugEnabled())
LOG.debug("{} action {}",this,action);
switch(action)
{
if (LOG.isDebugEnabled())
LOG.debug("{} action {}",this,action);
switch(action)
case DISPATCH:
{
case REQUEST_DISPATCH:
if (!_request.hasMetaData())
throw new IllegalStateException("state="+_state);
_request.setHandled(false);
_response.getHttpOutput().reopen();
_request.setDispatcherType(DispatcherType.REQUEST);
if (!_request.hasMetaData())
throw new IllegalStateException("state=" + _state);
_request.setHandled(false);
_response.getHttpOutput().reopen();
_request.setDispatcherType(DispatcherType.REQUEST);
List<HttpConfiguration.Customizer> customizers = _configuration.getCustomizers();
if (!customizers.isEmpty())
{
for (HttpConfiguration.Customizer customizer : customizers)
customizer.customize(getConnector(), _configuration, _request);
}
getServer().handle(this);
break;
case ASYNC_DISPATCH:
_request.setHandled(false);
_response.getHttpOutput().reopen();
_request.setDispatcherType(DispatcherType.ASYNC);
getServer().handleAsync(this);
break;
case ASYNC_EXPIRED:
_request.setHandled(false);
_response.getHttpOutput().reopen();
_request.setDispatcherType(DispatcherType.ERROR);
Throwable ex=_state.getAsyncContextEvent().getThrowable();
String reason="Async Timeout";
if (ex!=null && !(ex instanceof TimeoutException))
{
reason="Async Exception";
_request.setAttribute(RequestDispatcher.ERROR_EXCEPTION,ex);
}
_request.setAttribute(RequestDispatcher.ERROR_STATUS_CODE,new Integer(500));
_request.setAttribute(RequestDispatcher.ERROR_MESSAGE,reason);
_request.setAttribute(RequestDispatcher.ERROR_REQUEST_URI,_request.getRequestURI());
_response.setStatusWithReason(500,reason);
ErrorHandler eh = ErrorHandler.getErrorHandler(getServer(),_state.getContextHandler());
if (eh instanceof ErrorHandler.ErrorPageMapper)
{
String error_page=((ErrorHandler.ErrorPageMapper)eh).getErrorPage((HttpServletRequest)_state.getAsyncContextEvent().getSuppliedRequest());
if (error_page!=null)
_state.getAsyncContextEvent().setDispatchPath(error_page);
}
getServer().handleAsync(this);
break;
case READ_CALLBACK:
List<HttpConfiguration.Customizer> customizers = _configuration.getCustomizers();
if (!customizers.isEmpty())
{
ContextHandler handler=_state.getContextHandler();
if (handler!=null)
handler.handle(_request.getHttpInput());
else
_request.getHttpInput().run();
break;
for (HttpConfiguration.Customizer customizer : customizers)
customizer.customize(getConnector(), _configuration, _request);
}
getServer().handle(this);
break;
}
case ASYNC_DISPATCH:
{
_request.setHandled(false);
_response.getHttpOutput().reopen();
_request.setDispatcherType(DispatcherType.ASYNC);
getServer().handleAsync(this);
break;
}
case ERROR_DISPATCH:
{
_request.setHandled(false);
_response.getHttpOutput().reopen();
_request.setDispatcherType(DispatcherType.ERROR);
Throwable ex = _state.getAsyncContextEvent().getThrowable();
String reason;
if (ex == null || ex instanceof TimeoutException)
{
reason = "Async Timeout";
}
else
{
reason = "Async Exception";
_request.setAttribute(RequestDispatcher.ERROR_EXCEPTION, ex);
}
case WRITE_CALLBACK:
_request.setAttribute(RequestDispatcher.ERROR_STATUS_CODE, 500);
_request.setAttribute(RequestDispatcher.ERROR_MESSAGE, reason);
_request.setAttribute(RequestDispatcher.ERROR_REQUEST_URI, _request.getRequestURI());
_response.setStatusWithReason(500, reason);
ErrorHandler eh = ErrorHandler.getErrorHandler(getServer(), _state.getContextHandler());
if (eh instanceof ErrorHandler.ErrorPageMapper)
{
ContextHandler handler=_state.getContextHandler();
String error_page = ((ErrorHandler.ErrorPageMapper)eh).getErrorPage((HttpServletRequest)_state.getAsyncContextEvent().getSuppliedRequest());
if (error_page != null)
_state.getAsyncContextEvent().setDispatchPath(error_page);
}
if (handler!=null)
handler.handle(_response.getHttpOutput());
getServer().handleAsync(this);
break;
}
case READ_CALLBACK:
{
ContextHandler handler=_state.getContextHandler();
if (handler!=null)
handler.handle(_request.getHttpInput());
else
_request.getHttpInput().run();
break;
}
case WRITE_CALLBACK:
{
ContextHandler handler=_state.getContextHandler();
if (handler!=null)
handler.handle(_response.getHttpOutput());
else
_response.getHttpOutput().run();
break;
}
case ASYNC_ERROR:
{
_state.onError();
break;
}
case COMPLETING:
{
try
{
if (!_response.isCommitted() && !_request.isHandled())
_response.sendError(404);
else
_response.getHttpOutput().run();
break;
}
default:
break loop;
_response.closeOutput();
// _state=COMPLETE;
}
catch (Throwable x)
{
// state = ERROR;
}
break;
// async.complete();
// state -> COMPLETING
// flush output
// try { flush(); state -> COMPLETED; }
// catch (x) { state -> ERROR }
// unhandle();
// COMPLETED -> call asyncListeners.onComplete() -> TERMINATED
// ERROR -> call asyncListeners.onError(); -> TERMINATED
// unhandle();
// TERMINATED -> break out of loop.
}
case COMPLETED:
{
_state.onComplete();
// TODO: verify this code is needed and whether
// TODO: it's needed for onError() case too.
_request.setHandled(true);
onCompleted();
// TODO: set action to TERMINATED.
break;
}
default:
{
throw new IllegalStateException("state="+_state);
}
}
catch (EofException|QuietServletException|BadMessageException e)
{
error=true;
LOG.debug(e);
_state.error(e);
_request.setHandled(true);
handleException(e);
}
catch (Exception e)
}
catch (EofException|QuietServletException|BadMessageException e)
{
error=true;
LOG.debug(e);
_state.error(e);
_request.setHandled(true);
handleException(e);
}
catch (Exception e)
{
error=true;
if (_connector.isStarted())
LOG.warn(String.valueOf(_request.getHttpURI()), e);
else
LOG.debug(String.valueOf(_request.getHttpURI()), e);
_state.error(e);
_request.setHandled(true);
handleException(e);
}
catch (Throwable e)
{
if ("ContinuationThrowable".equals(e.getClass().getSimpleName()))
LOG.ignore(e);
else
{
error=true;
if (_connector.isStarted())
LOG.warn(String.valueOf(_request.getHttpURI()), e);
else
LOG.debug(String.valueOf(_request.getHttpURI()), e);
LOG.warn(String.valueOf(_request.getHttpURI()), e);
_state.error(e);
_request.setHandled(true);
handleException(e);
}
catch (Throwable e)
{
if ("ContinuationThrowable".equals(e.getClass().getSimpleName()))
LOG.ignore(e);
else
{
error=true;
if (_connector.isStarted())
LOG.warn(String.valueOf(_request.getHttpURI()), e);
else
LOG.debug(String.valueOf(_request.getHttpURI()), e);
LOG.warn(String.valueOf(_request.getHttpURI()), e);
_state.error(e);
_request.setHandled(true);
handleException(e);
}
}
finally
{
if (error && _state.isAsyncStarted())
_state.errorComplete();
action = _state.unhandle();
}
}
if (action==Action.COMPLETE)
finally
{
try
{
_state.completed();
if (!_response.isCommitted() && !_request.isHandled())
{
_response.sendError(404);
}
else
{
// Complete generating the response
_response.closeOutput();
}
}
catch(EofException|ClosedChannelException e)
{
LOG.debug(e);
}
catch(Exception e)
{
LOG.warn("complete failed",e);
}
finally
{
_request.setHandled(true);
onCompleted();
}
if (error && _state.isAsyncStarted())
_state.errorComplete();
action = _state.unhandle();
}
}
finally
{
}
if (LOG.isDebugEnabled())
LOG.debug("{} handle exit, result {}", this, action);
@ -531,7 +553,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
HttpFields fields = _response.getHttpFields();
if (_configuration.getSendDateHeader() && !fields.contains(HttpHeader.DATE))
fields.put(_connector.getServer().getDateField());
_request.setMetaData(request);
}
@ -539,7 +561,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
{
if (LOG.isDebugEnabled())
LOG.debug("{} content {}", this, content);
return _request.getHttpInput().addContent(content);
}
@ -554,10 +576,10 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
{
if (_requestLog!=null )
_requestLog.log(_request, _response);
_transport.onCompleted();
}
public boolean onEarlyEOF()
{
return _request.getHttpInput().earlyEOF();
@ -570,7 +592,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
try
{
if (_state.handling()==Action.REQUEST_DISPATCH)
if (_state.handling()==Action.DISPATCH)
{
ByteBuffer content=null;
HttpFields fields=new HttpFields();
@ -588,10 +610,11 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
}
finally
{
if (_state.unhandle()==Action.COMPLETE)
_state.completed();
else
throw new IllegalStateException();
// TODO: review whether it's the right state to check.
if (_state.unhandle()==Action.COMPLETING)
_state.onComplete();
else
throw new IllegalStateException(); // TODO: don't throw from finally blocks !
onCompleted();
}
}
@ -605,7 +628,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
if (info==null)
info = _response.newResponseMetaData();
commit(info);
// wrap callback to process 100 responses
final int status=info.getStatus();
final Callback committed = (status<200&&status>=100)?new Commit100Callback(callback):new CommitCallback(callback);
@ -641,7 +664,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
throw failure;
}
}
protected void commit (MetaData.Response info)
{
_committedMetaData=info;
@ -667,7 +690,7 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
_written+=BufferUtil.length(content);
sendResponse(null,content,complete,callback);
}
public HttpOutput.Interceptor getNextInterceptor()
{
return null;
@ -712,13 +735,13 @@ public class HttpChannel implements Runnable, HttpOutput.Interceptor
{
_callback = callback;
}
@Override
public boolean isNonBlocking()
{
return _callback.isNonBlocking();
}
@Override
public void succeeded()
{

View File

@ -44,17 +44,18 @@ public class HttpChannelState
private final static long DEFAULT_TIMEOUT=Long.getLong("org.eclipse.jetty.server.HttpChannelState.DEFAULT_TIMEOUT",30000L);
/** The dispatched state of the HttpChannel, used to control the overall livecycle
/**
* The dispatched state of the HttpChannel, used to control the overall lifecycle
*/
public enum State
{
IDLE, // Idle request
DISPATCHED, // Request dispatched to filter/servlet
ASYNC_WAIT, // Suspended and parked
ASYNC_WOKEN, // A thread has been dispatch to handle from ASYNCWAIT
ASYNC_IO, // Has been dispatched for async IO
COMPLETING, // Request is completable
COMPLETED, // Request is complete
ASYNC_WAIT, // Suspended and waiting
ASYNC_WOKEN, // Dispatch to handle from ASYNC_WAIT
ASYNC_IO, // Dispatched for async IO
COMPLETING, // Response is completable
COMPLETED, // Response is completed
UPGRADED // Request upgraded the connection
}
@ -63,13 +64,16 @@ public class HttpChannelState
*/
public enum Action
{
REQUEST_DISPATCH, // handle a normal request dispatch
DISPATCH, // handle a normal request dispatch
ASYNC_DISPATCH, // handle an async request dispatch
ASYNC_EXPIRED, // handle an async timeout
ERROR_DISPATCH, // handle a normal error
ASYNC_ERROR, // handle an async error
WRITE_CALLBACK, // handle an IO write callback
READ_CALLBACK, // handle an IO read callback
COMPLETING, // Completing the response
COMPLETED, // Response completed
TERMINATED, // No further actions
WAIT, // Wait for further events
COMPLETE // Complete the channel
}
/**
@ -79,11 +83,13 @@ public class HttpChannelState
*/
public enum Async
{
STARTED,
DISPATCH,
COMPLETE,
EXPIRING,
EXPIRED
STARTED, // AsyncContext.startAsync() has been called
DISPATCH, //
COMPLETE, // AsyncContext.complete() has been called
EXPIRING, // AsyncContext timeout just happened
EXPIRED, // AsyncContext timeout has been processed
ERRORING, // An error just happened
ERRORED // The error has been processed
}
private final boolean DEBUG=LOG.isDebugEnabled();
@ -161,11 +167,16 @@ public class HttpChannelState
}
}
private String getStatusStringLocked()
{
return String.format("s=%s i=%b a=%s",_state,_initial,_async);
}
public String getStatusString()
{
try(Locker.Lock lock= _locker.lock())
{
return String.format("s=%s i=%b a=%s",_state,_initial,_async);
return getStatusStringLocked();
}
}
@ -183,10 +194,10 @@ public class HttpChannelState
case IDLE:
_initial=true;
_state=State.DISPATCHED;
return Action.REQUEST_DISPATCH;
return Action.DISPATCH;
case COMPLETING:
return Action.COMPLETE;
return Action.COMPLETING;
case COMPLETED:
return Action.WAIT;
@ -214,7 +225,7 @@ public class HttpChannelState
{
case COMPLETE:
_state=State.COMPLETING;
return Action.COMPLETE;
return Action.COMPLETING;
case DISPATCH:
_state=State.DISPATCHED;
_async=null;
@ -224,9 +235,14 @@ public class HttpChannelState
case EXPIRED:
_state=State.DISPATCHED;
_async=null;
return Action.ASYNC_EXPIRED;
return Action.ERROR_DISPATCH;
case STARTED:
return Action.WAIT;
case ERRORING:
_state=State.DISPATCHED;
return Action.ASYNC_ERROR;
default:
throw new IllegalStateException(String.valueOf(async));
}
}
@ -245,7 +261,7 @@ public class HttpChannelState
try(Locker.Lock lock= _locker.lock())
{
if (_state!=State.DISPATCHED || _async!=null)
throw new IllegalStateException(this.getStatusString());
throw new IllegalStateException(this.getStatusStringLocked());
_async=Async.STARTED;
_event=event;
@ -263,6 +279,7 @@ public class HttpChannelState
}
catch(Exception e)
{
// TODO Async Dispatch Error
LOG.warn(e);
}
}
@ -274,7 +291,7 @@ public class HttpChannelState
try(Locker.Lock lock= _locker.lock())
{
if (_event!=null)
_event.setThrowable(th);
_event.addThrowable(th);
}
}
@ -298,11 +315,15 @@ public class HttpChannelState
{
switch(_state)
{
case COMPLETED:
return Action.COMPLETED;
case DISPATCHED:
case ASYNC_IO:
break;
default:
throw new IllegalStateException(this.getStatusString());
throw new IllegalStateException(this.getStatusStringLocked());
}
if (_async!=null)
@ -313,19 +334,19 @@ public class HttpChannelState
case COMPLETE:
_state=State.COMPLETING;
_async=null;
action = Action.COMPLETE;
action=Action.COMPLETING;
break;
case DISPATCH:
_state=State.DISPATCHED;
_async=null;
action = Action.ASYNC_DISPATCH;
action=Action.ASYNC_DISPATCH;
break;
case EXPIRED:
_state=State.DISPATCHED;
_async=null;
action = Action.ASYNC_EXPIRED;
action = Action.ERROR_DISPATCH;
break;
case STARTED:
@ -339,33 +360,44 @@ public class HttpChannelState
{
_asyncWrite=false;
_state=State.ASYNC_IO;
action = Action.WRITE_CALLBACK;
action=Action.WRITE_CALLBACK;
}
else
{
schedule_event=_event;
read_interested=_asyncReadUnready;
_state=State.ASYNC_WAIT;
action = Action.WAIT;
action=Action.WAIT;
}
break;
case EXPIRING:
schedule_event=_event;
_state=State.ASYNC_WAIT;
action = Action.WAIT;
action=Action.WAIT;
break;
case ERRORING:
_state=State.DISPATCHED;
action=Action.ERROR_DISPATCH;
break;
case ERRORED:
_state=State.DISPATCHED;
action=Action.ASYNC_ERROR;
_async=null;
break;
default:
_state=State.COMPLETING;
action = Action.COMPLETE;
action=Action.COMPLETING;
break;
}
}
else
{
_state=State.COMPLETING;
action = Action.COMPLETE;
action=Action.COMPLETING;
}
}
@ -381,8 +413,15 @@ public class HttpChannelState
boolean dispatch;
try(Locker.Lock lock= _locker.lock())
{
if (_async!=Async.STARTED && _async!=Async.EXPIRING)
throw new IllegalStateException("AsyncContext#dispath "+this.getStatusString());
switch(_async)
{
case STARTED:
case EXPIRING:
case ERRORED:
break;
default:
throw new IllegalStateException(this.getStatusStringLocked());
}
_async=Async.DISPATCH;
if (context!=null)
@ -415,9 +454,9 @@ public class HttpChannelState
scheduleDispatch();
}
protected void expired()
protected void onTimeout()
{
final List<AsyncListener> aListeners;
final List<AsyncListener> listeners;
AsyncContextEvent event;
try(Locker.Lock lock= _locker.lock())
{
@ -425,15 +464,15 @@ public class HttpChannelState
return;
_async=Async.EXPIRING;
event=_event;
aListeners=_asyncListeners;
listeners=_asyncListeners;
}
if (LOG.isDebugEnabled())
LOG.debug("Async timeout {}",this);
if (aListeners!=null)
if (listeners!=null)
{
for (AsyncListener listener : aListeners)
for (AsyncListener listener : listeners)
{
try
{
@ -442,8 +481,8 @@ public class HttpChannelState
catch(Exception e)
{
LOG.debug(e);
event.setThrowable(e);
_channel.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,e);
event.addThrowable(e);
_channel.getRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable());
break;
}
}
@ -454,7 +493,17 @@ public class HttpChannelState
{
if (_async==Async.EXPIRING)
{
_async=Async.EXPIRED;
// If the listeners did not call dispatch() or complete(),
// then the container must generate an error.
if (event.getThrowable()==null)
{
_async=Async.EXPIRED;
_event.addThrowable(new TimeoutException("Async API violation"));
}
else
{
_async=Async.ERRORING;
}
if (_state==State.ASYNC_WAIT)
{
_state=State.ASYNC_WOKEN;
@ -477,8 +526,15 @@ public class HttpChannelState
boolean handle=false;
try(Locker.Lock lock= _locker.lock())
{
if (_async!=Async.STARTED && _async!=Async.EXPIRING)
throw new IllegalStateException(this.getStatusString());
switch(_async)
{
case STARTED:
case EXPIRING:
case ERRORED:
break;
default:
throw new IllegalStateException(this.getStatusStringLocked());
}
_async=Async.COMPLETE;
if (_state==State.ASYNC_WAIT)
{
@ -510,22 +566,58 @@ public class HttpChannelState
cancelTimeout();
}
protected void completed()
protected void onError()
{
final List<AsyncListener> aListeners;
final AsyncContextEvent event;
try(Locker.Lock lock= _locker.lock())
{
if (_state!=State.DISPATCHED/* || _async!=Async.ERRORING*/)
throw new IllegalStateException(this.getStatusStringLocked());
aListeners=_asyncListeners;
event=_event;
_async=Async.ERRORED;
}
if (event!=null && aListeners!=null)
{
event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable());
event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,event.getThrowable().getMessage());
for (AsyncListener listener : aListeners)
{
try
{
listener.onError(event);
}
catch(Exception x)
{
LOG.info("Exception while invoking listener " + listener, x);
}
}
}
}
protected void onComplete()
{
final List<AsyncListener> aListeners;
final AsyncContextEvent event;
try(Locker.Lock lock= _locker.lock())
{
switch(_state)
{
case COMPLETING:
_state=State.COMPLETED;
aListeners=_asyncListeners;
event=_event;
_state=State.COMPLETED;
_async=null;
break;
default:
throw new IllegalStateException(this.getStatusString());
throw new IllegalStateException(this.getStatusStringLocked());
}
}
@ -533,20 +625,11 @@ public class HttpChannelState
{
if (aListeners!=null)
{
if (event.getThrowable()!=null)
{
event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_EXCEPTION,event.getThrowable());
event.getSuppliedRequest().setAttribute(RequestDispatcher.ERROR_MESSAGE,event.getThrowable().getMessage());
}
for (AsyncListener listener : aListeners)
{
try
{
if (event.getThrowable()!=null)
listener.onError(event);
else
listener.onComplete(event);
listener.onComplete(event);
}
catch(Exception e)
{
@ -554,7 +637,6 @@ public class HttpChannelState
}
}
}
event.completed();
}
}
@ -568,7 +650,7 @@ public class HttpChannelState
{
case DISPATCHED:
case ASYNC_IO:
throw new IllegalStateException(getStatusString());
throw new IllegalStateException(getStatusStringLocked());
case UPGRADED:
return;
default:
@ -596,7 +678,7 @@ public class HttpChannelState
case COMPLETED:
break;
default:
throw new IllegalStateException(getStatusString());
throw new IllegalStateException(getStatusStringLocked());
}
_asyncListeners=null;
_state=State.UPGRADED;

View File

@ -18,11 +18,6 @@
package org.eclipse.jetty.server;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@ -47,6 +42,11 @@ import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import static org.hamcrest.Matchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
public class AsyncRequestReadTest
{
private static Server server;
@ -75,11 +75,11 @@ public class AsyncRequestReadTest
{
server.setHandler(new AsyncStreamHandler());
server.start();
try (final Socket socket = new Socket("localhost",connector.getLocalPort()))
{
socket.setSoTimeout(1000);
byte[] content = new byte[32*4096];
Arrays.fill(content, (byte)120);
@ -93,8 +93,8 @@ public class AsyncRequestReadTest
byte[] h=header.getBytes(StandardCharsets.ISO_8859_1);
out.write(h);
out.write(content);
header=
"POST / HTTP/1.1\r\n"+
"Host: localhost\r\n"+
@ -123,7 +123,7 @@ public class AsyncRequestReadTest
{
server.setHandler(new AsyncStreamHandler());
server.start();
asyncReadTest(64,4,4,20);
asyncReadTest(256,16,16,50);
asyncReadTest(256,1,128,10);
@ -184,7 +184,7 @@ public class AsyncRequestReadTest
final AsyncContext async = request.startAsync();
// System.err.println("handle "+request.getContentLength());
new Thread()
{
@Override
@ -194,7 +194,7 @@ public class AsyncRequestReadTest
try(InputStream in = request.getInputStream();)
{
// System.err.println("reading...");
byte[] b = new byte[4*4096];
int read;
while((read =in.read(b))>=0)
@ -216,18 +216,18 @@ public class AsyncRequestReadTest
}.start();
}
}
@Test
public void testPartialRead() throws Exception
{
server.setHandler(new PartialReaderHandler());
server.start();
try (final Socket socket = new Socket("localhost",connector.getLocalPort()))
{
socket.setSoTimeout(1000);
socket.setSoTimeout(10000);
byte[] content = new byte[32*4096];
Arrays.fill(content, (byte)88);
@ -241,7 +241,7 @@ public class AsyncRequestReadTest
byte[] h=header.getBytes(StandardCharsets.ISO_8859_1);
out.write(h);
out.write(content);
header= "POST /?read=10 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"Content-Length: "+content.length+"\r\n"+
@ -252,7 +252,7 @@ public class AsyncRequestReadTest
out.write(h);
out.write(content);
out.flush();
BufferedReader in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
assertThat(in.readLine(),containsString("HTTP/1.1 200 OK"));
assertThat(in.readLine(),containsString("Content-Length:"));
@ -273,11 +273,11 @@ public class AsyncRequestReadTest
{
server.setHandler(new PartialReaderHandler());
server.start();
try (final Socket socket = new Socket("localhost",connector.getLocalPort()))
{
socket.setSoTimeout(10000);
byte[] content = new byte[32*4096];
Arrays.fill(content, (byte)88);
@ -308,11 +308,11 @@ public class AsyncRequestReadTest
{
server.setHandler(new PartialReaderHandler());
server.start();
try (final Socket socket = new Socket("localhost",connector.getLocalPort()))
{
socket.setSoTimeout(1000);
byte[] content = new byte[32*4096];
Arrays.fill(content, (byte)88);
@ -334,7 +334,7 @@ public class AsyncRequestReadTest
assertThat(in.readLine(),containsString("Server:"));
in.readLine();
assertThat(in.readLine(),containsString("XXXXXXX"));
socket.close();
}
}
@ -346,7 +346,7 @@ public class AsyncRequestReadTest
{
httpResponse.setStatus(200);
request.setHandled(true);
BufferedReader in = request.getReader();
PrintWriter out =httpResponse.getWriter();
int read=Integer.valueOf(request.getParameter("read"));

View File

@ -18,8 +18,6 @@
package org.eclipse.jetty.servlet;
import static org.junit.Assert.assertEquals;
import java.io.IOException;
import java.io.InputStream;
import java.net.Socket;
@ -43,6 +41,7 @@ import javax.servlet.http.HttpServletResponse;
import javax.servlet.http.HttpServletResponseWrapper;
import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.QuietServletException;
import org.eclipse.jetty.server.Request;
import org.eclipse.jetty.server.RequestLog;
import org.eclipse.jetty.server.Response;
@ -60,6 +59,10 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import static org.hamcrest.Matchers.startsWith;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
@RunWith(AdvancedRunner.class)
public class AsyncServletTest
@ -79,7 +82,7 @@ public class AsyncServletTest
{
_connector = new ServerConnector(_server);
_server.setConnectors(new Connector[]{ _connector });
_log=new ArrayList<>();
RequestLog log=new Log();
RequestLogHandler logHandler = new RequestLogHandler();
@ -91,7 +94,7 @@ public class AsyncServletTest
ServletContextHandler context = new ServletContextHandler(ServletContextHandler.NO_SESSIONS);
context.setContextPath("/ctx");
logHandler.setHandler(context);
_servletHandler=context.getServletHandler();
ServletHolder holder=new ServletHolder(_servlet);
holder.setAsyncSupported(true);
@ -116,7 +119,7 @@ public class AsyncServletTest
public void testNormal() throws Exception
{
String response=process(null,null);
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n",response);
@ -129,7 +132,7 @@ public class AsyncServletTest
public void testSleep() throws Exception
{
String response=process("sleep=200",null);
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n",response);
@ -139,32 +142,32 @@ public class AsyncServletTest
}
@Test
public void testSuspend() throws Exception
public void testStart() throws Exception
{
_expectedCode="500 ";
String response=process("suspend=200",null);
String response=process("start=200",null);
Assert.assertThat(response,Matchers.startsWith("HTTP/1.1 500 Async Timeout"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: start\r\n"+
"history: onTimeout\r\n"+
"history: ERROR /ctx/path/info\r\n"+
"history: !initial\r\n"+
"history: onComplete\r\n",response);
assertContains("ERROR: /ctx/path/info",response);
assertContains("ERROR DISPATCH: /ctx/path/info",response);
}
@Test
public void testSuspendOnTimeoutDispatch() throws Exception
public void testStartOnTimeoutDispatch() throws Exception
{
String response=process("suspend=200&timeout=dispatch",null);
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
String response=process("start=200&timeout=dispatch",null);
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: start\r\n"+
"history: onTimeout\r\n"+
"history: dispatch\r\n"+
"history: ASYNC /ctx/path/info\r\n"+
@ -175,14 +178,71 @@ public class AsyncServletTest
}
@Test
public void testSuspendOnTimeoutComplete() throws Exception
public void testStartOnTimeoutError() throws Exception
{
String response=process("suspend=200&timeout=complete",null);
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
_expectedCode="500 ";
String response=process("start=200&timeout=error",null);
assertThat(response,startsWith("HTTP/1.1 500 Async Exception\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: start\r\n"+
"history: onTimeout\r\n"+
"history: error\r\n"+
"history: onError\r\n"+
"history: ERROR /ctx/path/info\r\n"+
"history: !initial\r\n"+
"history: onComplete\r\n",response);
assertContains("ERROR DISPATCH",response);
}
@Test
public void testStartOnTimeoutErrorComplete() throws Exception
{
String response=process("start=200&timeout=error&error=complete",null);
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: start\r\n"+
"history: onTimeout\r\n"+
"history: error\r\n"+
"history: onError\r\n"+
"history: complete\r\n",response);
assertContains("COMPLETED",response);
}
@Test
public void testStartOnTimeoutErrorDispatch() throws Exception
{
String response=process("start=200&timeout=error&error=dispatch",null);
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: start\r\n"+
"history: onTimeout\r\n"+
"history: error\r\n"+
"history: onError\r\n"+
"history: dispatch\r\n"+
"history: ASYNC /ctx/path/info\r\n"+
"history: !initial\r\n"+
"history: onComplete\r\n",response);
assertContains("DISPATCHED",response);
}
@Test
public void testStartOnTimeoutComplete() throws Exception
{
String response=process("start=200&timeout=complete",null);
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: start\r\n"+
"history: onTimeout\r\n"+
"history: complete\r\n"+
"history: onComplete\r\n",response);
@ -191,15 +251,15 @@ public class AsyncServletTest
}
@Test
public void testSuspendWaitResume() throws Exception
public void testStartWaitDispatch() throws Exception
{
String response=process("suspend=200&resume=10",null);
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
String response=process("start=200&dispatch=10",null);
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: resume\r\n"+
"history: start\r\n"+
"history: dispatch\r\n"+
"history: ASYNC /ctx/path/info\r\n"+
"history: !initial\r\n"+
"history: onComplete\r\n",response);
@ -207,15 +267,15 @@ public class AsyncServletTest
}
@Test
public void testSuspendResume() throws Exception
public void testStartDispatch() throws Exception
{
String response=process("suspend=200&resume=0",null);
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
String response=process("start=200&dispatch=0",null);
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: resume\r\n"+
"history: start\r\n"+
"history: dispatch\r\n"+
"history: ASYNC /ctx/path/info\r\n"+
"history: !initial\r\n"+
"history: onComplete\r\n",response);
@ -223,14 +283,32 @@ public class AsyncServletTest
}
@Test
public void testSuspendWaitComplete() throws Exception
public void testStartError() throws Exception
{
String response=process("suspend=200&complete=50",null);
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
_expectedCode="500 ";
String response=process("start=200&throw=1",null);
assertThat(response,startsWith("HTTP/1.1 500 Server Error\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: start\r\n"+
/* TODO should there be an onError call?
"history: onError\r\n"+
"history: onComplete\r\n"
*/"",
response);
assertContains("HTTP ERROR: 500",response);
}
@Test
public void testStartWaitComplete() throws Exception
{
String response=process("start=200&complete=50",null);
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: start\r\n"+
"history: complete\r\n"+
"history: onComplete\r\n",response);
assertContains("COMPLETED",response);
@ -239,14 +317,14 @@ public class AsyncServletTest
}
@Test
public void testSuspendComplete() throws Exception
public void testStartComplete() throws Exception
{
String response=process("suspend=200&complete=0",null);
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
String response=process("start=200&complete=0",null);
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: start\r\n"+
"history: complete\r\n"+
"history: onComplete\r\n",response);
assertContains("COMPLETED",response);
@ -255,19 +333,20 @@ public class AsyncServletTest
}
@Test
public void testSuspendWaitResumeSuspendWaitResume() throws Exception
public void testStartWaitDispatchStartWaitDispatch() throws Exception
{
String response=process("suspend=1000&resume=10&suspend2=1000&resume2=10",null);
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
String response=process("start=1000&dispatch=10&start2=1000&dispatch2=10",null);
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: resume\r\n"+
"history: start\r\n"+
"history: dispatch\r\n"+
"history: ASYNC /ctx/path/info\r\n"+
"history: !initial\r\n"+
"history: suspend\r\n"+
"history: resume\r\n"+
"history: onStartAsync\r\n"+
"history: start\r\n"+
"history: dispatch\r\n"+
"history: ASYNC /ctx/path/info\r\n"+
"history: !initial\r\n"+
"history: onComplete\r\n",response);
@ -275,58 +354,61 @@ public class AsyncServletTest
}
@Test
public void testSuspendWaitResumeSuspendComplete() throws Exception
public void testStartWaitDispatchStartComplete() throws Exception
{
String response=process("suspend=1000&resume=10&suspend2=1000&complete2=10",null);
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
String response=process("start=1000&dispatch=10&start2=1000&complete2=10",null);
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: resume\r\n"+
"history: start\r\n"+
"history: dispatch\r\n"+
"history: ASYNC /ctx/path/info\r\n"+
"history: !initial\r\n"+
"history: suspend\r\n"+
"history: onStartAsync\r\n"+
"history: start\r\n"+
"history: complete\r\n"+
"history: onComplete\r\n",response);
assertContains("COMPLETED",response);
}
@Test
public void testSuspendWaitResumeSuspend() throws Exception
public void testStartWaitDispatchStart() throws Exception
{
_expectedCode="500 ";
String response=process("suspend=1000&resume=10&suspend2=10",null);
String response=process("start=1000&dispatch=10&start2=10",null);
assertEquals("HTTP/1.1 500 Async Timeout",response.substring(0,26));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: resume\r\n"+
"history: start\r\n"+
"history: dispatch\r\n"+
"history: ASYNC /ctx/path/info\r\n"+
"history: !initial\r\n"+
"history: suspend\r\n"+
"history: onStartAsync\r\n"+
"history: start\r\n"+
"history: onTimeout\r\n"+
"history: ERROR /ctx/path/info\r\n"+
"history: !initial\r\n"+
"history: onComplete\r\n",response);
assertContains("ERROR: /ctx/path/info",response);
assertContains("ERROR DISPATCH: /ctx/path/info",response);
}
@Test
public void testSuspendTimeoutSuspendResume() throws Exception
public void testStartTimeoutStartDispatch() throws Exception
{
String response=process("suspend=10&suspend2=1000&resume2=10",null);
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
String response=process("start=10&start2=1000&dispatch2=10",null);
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: start\r\n"+
"history: onTimeout\r\n"+
"history: ERROR /ctx/path/info\r\n"+
"history: !initial\r\n"+
"history: suspend\r\n"+
"history: resume\r\n"+
"history: onStartAsync\r\n"+
"history: start\r\n"+
"history: dispatch\r\n"+
"history: ASYNC /ctx/path/info\r\n"+
"history: !initial\r\n"+
"history: onComplete\r\n",response);
@ -334,53 +416,55 @@ public class AsyncServletTest
}
@Test
public void testSuspendTimeoutSuspendComplete() throws Exception
public void testStartTimeoutStartComplete() throws Exception
{
String response=process("suspend=10&suspend2=1000&complete2=10",null);
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
String response=process("start=10&start2=1000&complete2=10",null);
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: start\r\n"+
"history: onTimeout\r\n"+
"history: ERROR /ctx/path/info\r\n"+
"history: !initial\r\n"+
"history: suspend\r\n"+
"history: onStartAsync\r\n"+
"history: start\r\n"+
"history: complete\r\n"+
"history: onComplete\r\n",response);
assertContains("COMPLETED",response);
}
@Test
public void testSuspendTimeoutSuspend() throws Exception
public void testStartTimeoutStart() throws Exception
{
_expectedCode="500 ";
String response=process("suspend=10&suspend2=10",null);
String response=process("start=10&start2=10",null);
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: start\r\n"+
"history: onTimeout\r\n"+
"history: ERROR /ctx/path/info\r\n"+
"history: !initial\r\n"+
"history: suspend\r\n"+
"history: onStartAsync\r\n"+
"history: start\r\n"+
"history: onTimeout\r\n"+
"history: ERROR /ctx/path/info\r\n"+
"history: !initial\r\n"+
"history: onComplete\r\n",response);
assertContains("ERROR: /ctx/path/info",response);
assertContains("ERROR DISPATCH: /ctx/path/info",response);
}
@Test
public void testWrapStartDispatch() throws Exception
{
String response=process("wrap=true&suspend=200&resume=20",null);
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
String response=process("wrap=true&start=200&dispatch=20",null);
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: resume\r\n"+
"history: start\r\n"+
"history: dispatch\r\n"+
"history: ASYNC /ctx/path/info\r\n"+
"history: wrapped REQ RSP\r\n"+
"history: !initial\r\n"+
@ -392,49 +476,49 @@ public class AsyncServletTest
@Test
public void testStartDispatchEncodedPath() throws Exception
{
String response=process("suspend=200&resume=20&path=/p%20th3",null);
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
String response=process("start=200&dispatch=20&path=/p%20th3",null);
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: resume\r\n"+
"history: start\r\n"+
"history: dispatch\r\n"+
"history: ASYNC /ctx/p%20th3\r\n"+
"history: !initial\r\n"+
"history: onComplete\r\n",response);
assertContains("DISPATCHED",response);
}
@Test
public void testFwdStartDispatch() throws Exception
{
String response=process("fwd","suspend=200&resume=20",null);
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
String response=process("fwd","start=200&dispatch=20",null);
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: FWD REQUEST /ctx/fwd/info\r\n"+
"history: FORWARD /ctx/path1\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: resume\r\n"+
"history: start\r\n"+
"history: dispatch\r\n"+
"history: FWD ASYNC /ctx/fwd/info\r\n"+
"history: FORWARD /ctx/path1\r\n"+
"history: !initial\r\n"+
"history: onComplete\r\n",response);
assertContains("DISPATCHED",response);
}
@Test
public void testFwdStartDispatchPath() throws Exception
{
String response=process("fwd","suspend=200&resume=20&path=/path2",null);
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
String response=process("fwd","start=200&dispatch=20&path=/path2",null);
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: FWD REQUEST /ctx/fwd/info\r\n"+
"history: FORWARD /ctx/path1\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: resume\r\n"+
"history: start\r\n"+
"history: dispatch\r\n"+
"history: ASYNC /ctx/path2\r\n"+
"history: !initial\r\n"+
"history: onComplete\r\n",response);
@ -444,45 +528,45 @@ public class AsyncServletTest
@Test
public void testFwdWrapStartDispatch() throws Exception
{
String response=process("fwd","wrap=true&suspend=200&resume=20",null);
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
String response=process("fwd","wrap=true&start=200&dispatch=20",null);
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: FWD REQUEST /ctx/fwd/info\r\n"+
"history: FORWARD /ctx/path1\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: resume\r\n"+
"history: start\r\n"+
"history: dispatch\r\n"+
"history: ASYNC /ctx/path1\r\n"+
"history: wrapped REQ RSP\r\n"+
"history: !initial\r\n"+
"history: onComplete\r\n",response);
assertContains("DISPATCHED",response);
}
@Test
public void testFwdWrapStartDispatchPath() throws Exception
{
String response=process("fwd","wrap=true&suspend=200&resume=20&path=/path2",null);
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
String response=process("fwd","wrap=true&start=200&dispatch=20&path=/path2",null);
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: FWD REQUEST /ctx/fwd/info\r\n"+
"history: FORWARD /ctx/path1\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: resume\r\n"+
"history: start\r\n"+
"history: dispatch\r\n"+
"history: ASYNC /ctx/path2\r\n"+
"history: wrapped REQ RSP\r\n"+
"history: !initial\r\n"+
"history: onComplete\r\n",response);
assertContains("DISPATCHED",response);
}
@Test
public void testAsyncRead() throws Exception
{
_expectedLogs=2;
String header="GET /ctx/path/info?suspend=2000&resume=1500 HTTP/1.1\r\n"+
String header="GET /ctx/path/info?start=2000&dispatch=1500 HTTP/1.1\r\n"+
"Host: localhost\r\n"+
"Content-Length: 10\r\n"+
"\r\n";
@ -503,13 +587,13 @@ public class AsyncServletTest
socket.getOutputStream().write(close.getBytes(StandardCharsets.ISO_8859_1));
String response = IO.toString(socket.getInputStream());
assertEquals("HTTP/1.1 200 OK",response.substring(0,15));
assertThat(response,startsWith("HTTP/1.1 200 OK\r\n"));
assertContains(
"history: REQUEST /ctx/path/info\r\n"+
"history: initial\r\n"+
"history: suspend\r\n"+
"history: start\r\n"+
"history: async-read=10\r\n"+
"history: resume\r\n"+
"history: dispatch\r\n"+
"history: ASYNC /ctx/path/info\r\n"+
"history: !initial\r\n"+
"history: onComplete\r\n",response);
@ -520,7 +604,7 @@ public class AsyncServletTest
{
return process("path",query,content);
}
public synchronized String process(String path,String query,String content) throws Exception
{
String request = "GET /ctx/"+path+"/info";
@ -543,6 +627,7 @@ public class AsyncServletTest
{
socket.setSoTimeout(1000000);
socket.getOutputStream().write(request.getBytes(StandardCharsets.UTF_8));
socket.getOutputStream().flush();
return IO.toString(socket.getInputStream());
}
catch(Exception e)
@ -574,7 +659,7 @@ public class AsyncServletTest
request.getServletContext().getRequestDispatcher("/path1").forward(request,response);
}
}
private static class AsyncServlet extends HttpServlet
{
private static final long serialVersionUID = -8161977157098646562L;
@ -593,36 +678,36 @@ public class AsyncServletTest
{
// ignored
}
// System.err.println(request.getDispatcherType()+" "+request.getRequestURI());
response.addHeader("history",request.getDispatcherType()+" "+request.getRequestURI());
if (request instanceof ServletRequestWrapper || response instanceof ServletResponseWrapper)
response.addHeader("history","wrapped"+((request instanceof ServletRequestWrapper)?" REQ":"")+((response instanceof ServletResponseWrapper)?" RSP":""));
boolean wrap="true".equals(request.getParameter("wrap"));
int read_before=0;
long sleep_for=-1;
long suspend_for=-1;
long suspend2_for=-1;
long resume_after=-1;
long resume2_after=-1;
long start_for=-1;
long start2_for=-1;
long dispatch_after=-1;
long dispatch2_after=-1;
long complete_after=-1;
long complete2_after=-1;
if (request.getParameter("read")!=null)
read_before=Integer.parseInt(request.getParameter("read"));
if (request.getParameter("sleep")!=null)
sleep_for=Integer.parseInt(request.getParameter("sleep"));
if (request.getParameter("suspend")!=null)
suspend_for=Integer.parseInt(request.getParameter("suspend"));
if (request.getParameter("suspend2")!=null)
suspend2_for=Integer.parseInt(request.getParameter("suspend2"));
if (request.getParameter("resume")!=null)
resume_after=Integer.parseInt(request.getParameter("resume"));
if (request.getParameter("start")!=null)
start_for=Integer.parseInt(request.getParameter("start"));
if (request.getParameter("start2")!=null)
start2_for=Integer.parseInt(request.getParameter("start2"));
if (request.getParameter("dispatch")!=null)
dispatch_after=Integer.parseInt(request.getParameter("dispatch"));
final String path=request.getParameter("path");
if (request.getParameter("resume2")!=null)
resume2_after=Integer.parseInt(request.getParameter("resume2"));
if (request.getParameter("dispatch2")!=null)
dispatch2_after=Integer.parseInt(request.getParameter("dispatch2"));
if (request.getParameter("complete")!=null)
complete_after=Integer.parseInt(request.getParameter("complete"));
if (request.getParameter("complete2")!=null)
@ -669,13 +754,16 @@ public class AsyncServletTest
}.start();
}
if (suspend_for>=0)
if (start_for>=0)
{
final AsyncContext async=wrap?request.startAsync(new HttpServletRequestWrapper(request),new HttpServletResponseWrapper(response)):request.startAsync();
if (suspend_for>0)
async.setTimeout(suspend_for);
if (start_for>0)
async.setTimeout(start_for);
async.addListener(__listener);
response.addHeader("history","suspend");
response.addHeader("history","start");
if ("1".equals(request.getParameter("throw")))
throw new QuietServletException(new Exception("test throw in async 1"));
if (complete_after>0)
{
@ -709,15 +797,15 @@ public class AsyncServletTest
response.addHeader("history","complete");
async.complete();
}
else if (resume_after>0)
else if (dispatch_after>0)
{
TimerTask resume = new TimerTask()
TimerTask dispatch = new TimerTask()
{
@Override
public void run()
{
((HttpServletResponse)async.getResponse()).addHeader("history","resume");
if (path!=null)
((HttpServletResponse)async.getResponse()).addHeader("history","dispatch");
if (path!=null)
{
int q=path.indexOf('?');
String uriInContext=(q>=0)
@ -731,13 +819,13 @@ public class AsyncServletTest
};
synchronized (_timer)
{
_timer.schedule(resume,resume_after);
_timer.schedule(dispatch,dispatch_after);
}
}
else if (resume_after==0)
else if (dispatch_after==0)
{
((HttpServletResponse)async.getResponse()).addHeader("history","resume");
if (path!=null)
((HttpServletResponse)async.getResponse()).addHeader("history","dispatch");
if (path!=null)
async.dispatch(path);
else
async.dispatch();
@ -767,18 +855,20 @@ public class AsyncServletTest
{
response.addHeader("history","!initial");
if (suspend2_for>=0 && request.getAttribute("2nd")==null)
if (start2_for>=0 && request.getAttribute("2nd")==null)
{
final AsyncContext async=wrap?request.startAsync(new HttpServletRequestWrapper(request),new HttpServletResponseWrapper(response)):request.startAsync();
async.addListener(__listener);
request.setAttribute("2nd","cycle");
if (suspend2_for>0)
if (start2_for>0)
{
async.setTimeout(suspend2_for);
async.setTimeout(start2_for);
}
// continuation.addContinuationListener(__listener);
response.addHeader("history","suspend");
response.addHeader("history","start");
if ("2".equals(request.getParameter("throw")))
throw new QuietServletException(new Exception("test throw in async 2"));
if (complete2_after>0)
{
@ -812,23 +902,23 @@ public class AsyncServletTest
response.addHeader("history","complete");
async.complete();
}
else if (resume2_after>0)
else if (dispatch2_after>0)
{
TimerTask resume = new TimerTask()
TimerTask dispatch = new TimerTask()
{
@Override
public void run()
{
response.addHeader("history","resume");
response.addHeader("history","dispatch");
async.dispatch();
}
};
synchronized (_timer)
{
_timer.schedule(resume,resume2_after);
_timer.schedule(dispatch,dispatch2_after);
}
}
else if (resume2_after==0)
else if (dispatch2_after==0)
{
response.addHeader("history","dispatch");
async.dispatch();
@ -836,7 +926,7 @@ public class AsyncServletTest
}
else if(request.getDispatcherType()==DispatcherType.ERROR)
{
response.getOutputStream().println("ERROR: "+request.getContextPath()+request.getServletPath()+request.getPathInfo());
response.getOutputStream().println("ERROR DISPATCH: "+request.getContextPath()+request.getServletPath()+request.getPathInfo());
}
else
{
@ -858,12 +948,20 @@ public class AsyncServletTest
if (action!=null)
{
((HttpServletResponse)event.getSuppliedResponse()).addHeader("history",action);
if ("dispatch".equals(action))
event.getAsyncContext().dispatch();
if ("complete".equals(action))
switch(action)
{
event.getSuppliedResponse().getOutputStream().println("COMPLETED\n");
event.getAsyncContext().complete();
case "dispatch":
event.getAsyncContext().dispatch();
break;
case "complete":
event.getSuppliedResponse().getOutputStream().println("COMPLETED\n");
event.getAsyncContext().complete();
break;
case "error":
throw new RuntimeException("error in onTimeout");
}
}
}
@ -871,11 +969,30 @@ public class AsyncServletTest
@Override
public void onStartAsync(AsyncEvent event) throws IOException
{
((HttpServletResponse)event.getSuppliedResponse()).addHeader("history","onStartAsync");
}
@Override
public void onError(AsyncEvent event) throws IOException
{
((HttpServletResponse)event.getSuppliedResponse()).addHeader("history","onError");
String action=event.getSuppliedRequest().getParameter("error");
if (action!=null)
{
((HttpServletResponse)event.getSuppliedResponse()).addHeader("history",action);
switch(action)
{
case "dispatch":
event.getAsyncContext().dispatch();
break;
case "complete":
event.getSuppliedResponse().getOutputStream().println("COMPLETED\n");
event.getAsyncContext().complete();
break;
}
}
}
@Override
@ -889,7 +1006,7 @@ public class AsyncServletTest
{
@Override
public void log(Request request, Response response)
{
{
int status = response.getCommittedMetaData().getStatus();
long written = response.getHttpChannel().getBytesWritten();
_log.add(status+" "+written+" "+request.getRequestURI());