422703 Support reentrant HttpChannel and HttpConnection

This commit is contained in:
Greg Wilkins 2013-11-28 12:25:04 +11:00
parent 1b30b0f9a8
commit 9c013b723b
2 changed files with 129 additions and 113 deletions

View File

@ -78,9 +78,14 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
return __currentChannel.get(); return __currentChannel.get();
} }
protected static void setCurrentHttpChannel(HttpChannel<?> channel) protected static HttpChannel<?> setCurrentHttpChannel(HttpChannel<?> channel)
{ {
__currentChannel.set(channel); HttpChannel<?> last=__currentChannel.get();
if (channel==null)
__currentChannel.remove();
else
__currentChannel.set(channel);
return last;
} }
private final AtomicBoolean _committed = new AtomicBoolean(); private final AtomicBoolean _committed = new AtomicBoolean();
@ -246,7 +251,7 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
{ {
LOG.debug("{} handle enter", this); LOG.debug("{} handle enter", this);
setCurrentHttpChannel(this); final HttpChannel<?>last = setCurrentHttpChannel(this);
String threadName = null; String threadName = null;
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
@ -255,125 +260,131 @@ public class HttpChannel<T> implements HttpParser.RequestHandler<T>, Runnable
Thread.currentThread().setName(threadName + " - " + _uri); Thread.currentThread().setName(threadName + " - " + _uri);
} }
// Loop here to handle async request redispatches.
// The loop is controlled by the call to async.unhandle in the
// finally block below. Unhandle will return false only if an async dispatch has
// already happened when unhandle is called.
HttpChannelState.Action action = _state.handling(); HttpChannelState.Action action = _state.handling();
loop: while (action.ordinal()<HttpChannelState.Action.WAIT.ordinal() && getServer().isRunning()) try
{ {
boolean error=false; // Loop here to handle async request redispatches.
try // The loop is controlled by the call to async.unhandle in the
// finally block below. Unhandle will return false only if an async dispatch has
// already happened when unhandle is called.
loop: while (action.ordinal()<HttpChannelState.Action.WAIT.ordinal() && getServer().isRunning())
{ {
LOG.debug("{} action {}",this,action); boolean error=false;
try
switch(action)
{ {
case REQUEST_DISPATCH: LOG.debug("{} action {}",this,action);
_request.setHandled(false);
_response.getHttpOutput().reopen();
_request.setTimeStamp(System.currentTimeMillis());
_request.setDispatcherType(DispatcherType.REQUEST);
for (HttpConfiguration.Customizer customizer : _configuration.getCustomizers()) switch(action)
customizer.customize(getConnector(),_configuration,_request); {
getServer().handle(this); case REQUEST_DISPATCH:
break; _request.setHandled(false);
_response.getHttpOutput().reopen();
_request.setTimeStamp(System.currentTimeMillis());
_request.setDispatcherType(DispatcherType.REQUEST);
case ASYNC_DISPATCH: for (HttpConfiguration.Customizer customizer : _configuration.getCustomizers())
_request.setHandled(false); customizer.customize(getConnector(),_configuration,_request);
_response.getHttpOutput().reopen(); getServer().handle(this);
_request.setDispatcherType(DispatcherType.ASYNC); break;
getServer().handleAsync(this);
break;
case ASYNC_EXPIRED: case ASYNC_DISPATCH:
_request.setHandled(false); _request.setHandled(false);
_response.getHttpOutput().reopen(); _response.getHttpOutput().reopen();
_request.setDispatcherType(DispatcherType.ERROR); _request.setDispatcherType(DispatcherType.ASYNC);
getServer().handleAsync(this);
break;
Throwable ex=_state.getAsyncContextEvent().getThrowable(); case ASYNC_EXPIRED:
String reason="Async Timeout"; _request.setHandled(false);
if (ex!=null) _response.getHttpOutput().reopen();
_request.setDispatcherType(DispatcherType.ERROR);
Throwable ex=_state.getAsyncContextEvent().getThrowable();
String reason="Async Timeout";
if (ex!=null)
{
reason="Async Exception";
_request.setAttribute(RequestDispatcher.ERROR_EXCEPTION,ex);
}
_request.setAttribute(RequestDispatcher.ERROR_STATUS_CODE,new Integer(500));
_request.setAttribute(RequestDispatcher.ERROR_MESSAGE,reason);
_request.setAttribute(RequestDispatcher.ERROR_REQUEST_URI,_request.getRequestURI());
_response.setStatusWithReason(500,reason);
ErrorHandler eh = _state.getContextHandler().getErrorHandler();
if (eh instanceof ErrorHandler.ErrorPageMapper)
{
String error_page=((ErrorHandler.ErrorPageMapper)eh).getErrorPage((HttpServletRequest)_state.getAsyncContextEvent().getSuppliedRequest());
if (error_page!=null)
_state.getAsyncContextEvent().setDispatchPath(error_page);
}
getServer().handleAsync(this);
break;
case READ_CALLBACK:
{ {
reason="Async Exception"; ContextHandler handler=_state.getContextHandler();
_request.setAttribute(RequestDispatcher.ERROR_EXCEPTION,ex); if (handler!=null)
} handler.handle(_request.getHttpInput());
_request.setAttribute(RequestDispatcher.ERROR_STATUS_CODE,new Integer(500)); else
_request.setAttribute(RequestDispatcher.ERROR_MESSAGE,reason); _request.getHttpInput().run();
_request.setAttribute(RequestDispatcher.ERROR_REQUEST_URI,_request.getRequestURI()); break;
_response.setStatusWithReason(500,reason);
ErrorHandler eh = _state.getContextHandler().getErrorHandler();
if (eh instanceof ErrorHandler.ErrorPageMapper)
{
String error_page=((ErrorHandler.ErrorPageMapper)eh).getErrorPage((HttpServletRequest)_state.getAsyncContextEvent().getSuppliedRequest());
if (error_page!=null)
_state.getAsyncContextEvent().setDispatchPath(error_page);
} }
getServer().handleAsync(this); case WRITE_CALLBACK:
break; {
ContextHandler handler=_state.getContextHandler();
if (handler!=null)
handler.handle(_response.getHttpOutput());
else
_response.getHttpOutput().run();
break;
}
default:
break loop;
case READ_CALLBACK:
{
ContextHandler handler=_state.getContextHandler();
if (handler!=null)
handler.handle(_request.getHttpInput());
else
_request.getHttpInput().run();
break;
} }
case WRITE_CALLBACK:
{
ContextHandler handler=_state.getContextHandler();
if (handler!=null)
handler.handle(_response.getHttpOutput());
else
_response.getHttpOutput().run();
break;
}
default:
break loop;
} }
} catch (Error e)
catch (Error e) {
{ if ("ContinuationThrowable".equals(e.getClass().getSimpleName()))
if ("ContinuationThrowable".equals(e.getClass().getSimpleName())) LOG.ignore(e);
LOG.ignore(e); else
else {
error=true;
throw e;
}
}
catch (Exception e)
{ {
error=true; error=true;
throw e; if (e instanceof EofException)
LOG.debug(e);
else
LOG.warn(String.valueOf(_uri), e);
_state.error(e);
_request.setHandled(true);
handleException(e);
}
finally
{
if (error && _state.isAsyncStarted())
_state.errorComplete();
action = _state.unhandle();
} }
} }
catch (Exception e)
{
error=true;
if (e instanceof EofException)
LOG.debug(e);
else
LOG.warn(String.valueOf(_uri), e);
_state.error(e);
_request.setHandled(true);
handleException(e);
}
finally
{
if (error && _state.isAsyncStarted())
_state.errorComplete();
action = _state.unhandle();
}
}
if (threadName != null && LOG.isDebugEnabled()) }
Thread.currentThread().setName(threadName); finally
setCurrentHttpChannel(null); {
setCurrentHttpChannel(null);
if (threadName != null && LOG.isDebugEnabled())
Thread.currentThread().setName(threadName);
}
if (action==Action.COMPLETE) if (action==Action.COMPLETE)
{ {

View File

@ -69,9 +69,14 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
return __currentConnection.get(); return __currentConnection.get();
} }
protected static void setCurrentConnection(HttpConnection connection) protected static HttpConnection setCurrentConnection(HttpConnection connection)
{ {
__currentConnection.set(connection); HttpConnection last=__currentConnection.get();
if (connection==null)
__currentConnection.remove();
else
__currentConnection.set(connection);
return last;
} }
public HttpConfiguration getHttpConfiguration() public HttpConfiguration getHttpConfiguration()
@ -182,7 +187,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
{ {
LOG.debug("{} onFillable {}", this, _channel.getState()); LOG.debug("{} onFillable {}", this, _channel.getState());
setCurrentConnection(this); final HttpConnection last=setCurrentConnection(this);
int filled=Integer.MAX_VALUE; int filled=Integer.MAX_VALUE;
boolean suspended=false; boolean suspended=false;
try try
@ -246,7 +251,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
} }
finally finally
{ {
setCurrentConnection(null); setCurrentConnection(last);
if (!suspended && getEndPoint().isOpen() && getEndPoint().getConnection()==this) if (!suspended && getEndPoint().isOpen() && getEndPoint().getConnection()==this)
{ {
fillInterested(); fillInterested();