Issue #1418 updates after review
This commit is contained in:
parent
ea39cbef57
commit
7a6f2860e1
|
@ -106,6 +106,11 @@ public class AsyncContextEvent extends AsyncEvent implements Runnable
|
|||
_timeoutTask = task;
|
||||
}
|
||||
|
||||
public boolean hasTimeoutTask()
|
||||
{
|
||||
return _timeoutTask!=null;
|
||||
}
|
||||
|
||||
public void cancelTimeoutTask()
|
||||
{
|
||||
Scheduler.Task task=_timeoutTask;
|
||||
|
|
|
@ -78,7 +78,7 @@ public class HttpChannelState
|
|||
ERROR_DISPATCH, // handle a normal error
|
||||
ASYNC_ERROR, // handle an async error
|
||||
WRITE_CALLBACK, // handle an IO write callback
|
||||
READ_PRODUCE, // Check is a read is possible by parsing/filling
|
||||
READ_PRODUCE, // Check is a read is possible by parsing/filling
|
||||
READ_CALLBACK, // handle an IO read callback
|
||||
COMPLETE, // Complete the response
|
||||
TERMINATED, // No further actions
|
||||
|
@ -102,13 +102,13 @@ public class HttpChannelState
|
|||
|
||||
private enum AsyncRead
|
||||
{
|
||||
NONE, // No isReady; No data
|
||||
AVAILABLE, // No isReady; onDataAvailable
|
||||
NEEDED, // isReady()==false handling; No data
|
||||
IDLE, // No isReady; No data
|
||||
AVAILABLE, // No isReady; onContentAdded has been called
|
||||
REGISTER, // isReady()==false handling; No data
|
||||
REGISTERED, // isReady()==false !handling; No data
|
||||
POSSIBLE, // isReady()==false async callback called (http/1 only)
|
||||
PRODUCING, // isReady()==false handling content production (http/1 only)
|
||||
READY // isReady() was false, data now available
|
||||
POSSIBLE, // isReady()==false async read callback called (http/1 only)
|
||||
PRODUCING, // isReady()==false READ_PRODUCE action is being handled (http/1 only)
|
||||
READY // isReady() was false, onContentAdded has been called
|
||||
}
|
||||
|
||||
private final Locker _locker=new Locker();
|
||||
|
@ -117,7 +117,7 @@ public class HttpChannelState
|
|||
private State _state;
|
||||
private Async _async;
|
||||
private boolean _initial;
|
||||
private AsyncRead _asyncRead=AsyncRead.NONE;
|
||||
private AsyncRead _asyncRead=AsyncRead.IDLE;
|
||||
private boolean _asyncWritePossible;
|
||||
private long _timeoutMs=DEFAULT_TIMEOUT;
|
||||
private AsyncContextEvent _event;
|
||||
|
@ -237,9 +237,14 @@ public class HttpChannelState
|
|||
return Action.READ_PRODUCE;
|
||||
case READY:
|
||||
_state=State.ASYNC_IO;
|
||||
_asyncRead=AsyncRead.NONE;
|
||||
_asyncRead=AsyncRead.IDLE;
|
||||
return Action.READ_CALLBACK;
|
||||
default:
|
||||
case REGISTER:
|
||||
case PRODUCING:
|
||||
throw new IllegalStateException(toStringLocked());
|
||||
case IDLE:
|
||||
case AVAILABLE:
|
||||
case REGISTERED:
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -386,7 +391,6 @@ public class HttpChannelState
|
|||
*/
|
||||
protected Action unhandle()
|
||||
{
|
||||
Action action;
|
||||
boolean read_interested = false;
|
||||
|
||||
try(Locker.Lock lock= _locker.lock())
|
||||
|
@ -414,41 +418,38 @@ public class HttpChannelState
|
|||
}
|
||||
|
||||
_initial=false;
|
||||
async: switch(_async)
|
||||
switch(_async)
|
||||
{
|
||||
case COMPLETE:
|
||||
_state=State.COMPLETING;
|
||||
_async=Async.NOT_ASYNC;
|
||||
action=Action.COMPLETE;
|
||||
break;
|
||||
return Action.COMPLETE;
|
||||
|
||||
case DISPATCH:
|
||||
_state=State.DISPATCHED;
|
||||
_async=Async.NOT_ASYNC;
|
||||
action=Action.ASYNC_DISPATCH;
|
||||
break;
|
||||
return Action.ASYNC_DISPATCH;
|
||||
|
||||
case STARTED:
|
||||
switch(_asyncRead)
|
||||
{
|
||||
case READY:
|
||||
_state=State.ASYNC_IO;
|
||||
_asyncRead=AsyncRead.NONE;
|
||||
action=Action.READ_CALLBACK;
|
||||
break async;
|
||||
_asyncRead=AsyncRead.IDLE;
|
||||
return Action.READ_CALLBACK;
|
||||
|
||||
case POSSIBLE:
|
||||
_state=State.ASYNC_IO;
|
||||
_asyncRead=AsyncRead.PRODUCING;
|
||||
action=Action.READ_PRODUCE;
|
||||
break async;
|
||||
return Action.READ_PRODUCE;
|
||||
|
||||
case NEEDED:
|
||||
case REGISTER:
|
||||
case PRODUCING:
|
||||
_asyncRead=AsyncRead.REGISTERED;
|
||||
read_interested=true;
|
||||
|
||||
case NONE:
|
||||
break;
|
||||
|
||||
case IDLE:
|
||||
case AVAILABLE:
|
||||
case REGISTERED:
|
||||
break;
|
||||
|
@ -458,54 +459,50 @@ public class HttpChannelState
|
|||
{
|
||||
_state=State.ASYNC_IO;
|
||||
_asyncWritePossible=false;
|
||||
action=Action.WRITE_CALLBACK;
|
||||
return Action.WRITE_CALLBACK;
|
||||
}
|
||||
else
|
||||
{
|
||||
_state=State.ASYNC_WAIT;
|
||||
action=Action.WAIT;
|
||||
|
||||
Scheduler scheduler=_channel.getScheduler();
|
||||
if (scheduler!=null && _timeoutMs>0)
|
||||
if (scheduler!=null && _timeoutMs>0 && !_event.hasTimeoutTask())
|
||||
_event.setTimeoutTask(scheduler.schedule(_event,_timeoutMs,TimeUnit.MILLISECONDS));
|
||||
|
||||
return Action.WAIT;
|
||||
}
|
||||
break;
|
||||
|
||||
case EXPIRING:
|
||||
// onTimeout callbacks still being called, so just WAIT
|
||||
_state=State.ASYNC_WAIT;
|
||||
action=Action.WAIT;
|
||||
break;
|
||||
return Action.WAIT;
|
||||
|
||||
case EXPIRED:
|
||||
// onTimeout handling is complete, but did not dispatch as
|
||||
// we were handling. So do the error dispatch here
|
||||
_state=State.DISPATCHED;
|
||||
_async=Async.NOT_ASYNC;
|
||||
action=Action.ERROR_DISPATCH;
|
||||
break;
|
||||
return Action.ERROR_DISPATCH;
|
||||
|
||||
case ERRORED:
|
||||
_state=State.DISPATCHED;
|
||||
_async=Async.NOT_ASYNC;
|
||||
action=Action.ERROR_DISPATCH;
|
||||
break;
|
||||
return Action.ERROR_DISPATCH;
|
||||
|
||||
case NOT_ASYNC:
|
||||
_state=State.COMPLETING;
|
||||
action=Action.COMPLETE;
|
||||
break;
|
||||
return Action.COMPLETE;
|
||||
|
||||
default:
|
||||
_state=State.COMPLETING;
|
||||
action=Action.COMPLETE;
|
||||
break;
|
||||
return Action.COMPLETE;
|
||||
}
|
||||
}
|
||||
|
||||
if (read_interested)
|
||||
_channel.asyncReadFillInterested();
|
||||
|
||||
return action;
|
||||
finally
|
||||
{
|
||||
if (read_interested)
|
||||
_channel.asyncReadFillInterested();
|
||||
}
|
||||
}
|
||||
|
||||
public void dispatch(ServletContext context, String path)
|
||||
|
@ -931,7 +928,7 @@ public class HttpChannelState
|
|||
_state=State.IDLE;
|
||||
_async=Async.NOT_ASYNC;
|
||||
_initial=true;
|
||||
_asyncRead=AsyncRead.NONE;
|
||||
_asyncRead=AsyncRead.IDLE;
|
||||
_asyncWritePossible=false;
|
||||
_timeoutMs=DEFAULT_TIMEOUT;
|
||||
_event=null;
|
||||
|
@ -958,7 +955,7 @@ public class HttpChannelState
|
|||
_state=State.UPGRADED;
|
||||
_async=Async.NOT_ASYNC;
|
||||
_initial=true;
|
||||
_asyncRead=AsyncRead.NONE;
|
||||
_asyncRead=AsyncRead.IDLE;
|
||||
_asyncWritePossible=false;
|
||||
_timeoutMs=DEFAULT_TIMEOUT;
|
||||
_event=null;
|
||||
|
@ -1149,10 +1146,7 @@ public class HttpChannelState
|
|||
|
||||
switch(_asyncRead)
|
||||
{
|
||||
case NONE:
|
||||
case AVAILABLE:
|
||||
case READY:
|
||||
case NEEDED:
|
||||
case IDLE:
|
||||
if (_state==State.ASYNC_WAIT)
|
||||
{
|
||||
interested=true;
|
||||
|
@ -1160,16 +1154,18 @@ public class HttpChannelState
|
|||
}
|
||||
else
|
||||
{
|
||||
_asyncRead=AsyncRead.NEEDED;
|
||||
_asyncRead=AsyncRead.REGISTER;
|
||||
}
|
||||
break;
|
||||
|
||||
case REGISTER:
|
||||
case REGISTERED:
|
||||
case POSSIBLE:
|
||||
case PRODUCING:
|
||||
break;
|
||||
|
||||
default:
|
||||
case AVAILABLE:
|
||||
case READY:
|
||||
throw new IllegalStateException(toStringLocked());
|
||||
}
|
||||
}
|
||||
|
@ -1185,31 +1181,30 @@ public class HttpChannelState
|
|||
* is returned.
|
||||
* @return True IFF the channel was unready and in ASYNC_WAIT state
|
||||
*/
|
||||
public boolean onDataAvailable()
|
||||
public boolean onContentAdded()
|
||||
{
|
||||
boolean woken=false;
|
||||
try(Locker.Lock lock= _locker.lock())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("onReadPossible {}",toStringLocked());
|
||||
LOG.debug("onContentAdded {}",toStringLocked());
|
||||
|
||||
switch(_asyncRead)
|
||||
{
|
||||
case NONE:
|
||||
case IDLE:
|
||||
_asyncRead=AsyncRead.AVAILABLE;
|
||||
break;
|
||||
|
||||
case AVAILABLE:
|
||||
case READY:
|
||||
break;
|
||||
|
||||
case PRODUCING:
|
||||
_asyncRead=AsyncRead.READY;
|
||||
break;
|
||||
|
||||
case NEEDED:
|
||||
case REGISTER:
|
||||
case REGISTERED:
|
||||
case POSSIBLE:
|
||||
case READY:
|
||||
_asyncRead=AsyncRead.READY;
|
||||
if (_state==State.ASYNC_WAIT)
|
||||
{
|
||||
|
@ -1217,8 +1212,8 @@ public class HttpChannelState
|
|||
_state=State.ASYNC_WOKEN;
|
||||
}
|
||||
break;
|
||||
|
||||
default:
|
||||
|
||||
case POSSIBLE:
|
||||
throw new IllegalStateException(toStringLocked());
|
||||
}
|
||||
}
|
||||
|
@ -1228,7 +1223,7 @@ public class HttpChannelState
|
|||
/**
|
||||
* Called to signal that the channel is ready for a callback.
|
||||
* This is similar to calling {@link #onReadUnready()} followed by
|
||||
* {@link #onDataAvailable()}, except that as content is already
|
||||
* {@link #onContentAdded()}, except that as content is already
|
||||
* available, read interest is never set.
|
||||
* @return true if woken
|
||||
*/
|
||||
|
@ -1242,7 +1237,7 @@ public class HttpChannelState
|
|||
|
||||
switch(_asyncRead)
|
||||
{
|
||||
case NONE:
|
||||
case IDLE:
|
||||
case AVAILABLE:
|
||||
_asyncRead=AsyncRead.READY;
|
||||
if (_state==State.ASYNC_WAIT)
|
||||
|
@ -1290,6 +1285,21 @@ public class HttpChannelState
|
|||
return woken;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called to signal that application has called read.
|
||||
* @return true if woken
|
||||
*/
|
||||
public void onRead(boolean available)
|
||||
{
|
||||
try(Locker.Lock lock= _locker.lock())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("onRead {} {}",available,toStringLocked());
|
||||
|
||||
_asyncRead=available?AsyncRead.AVAILABLE:AsyncRead.IDLE;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called to signal that a read has read -1.
|
||||
* Will wake if the read was called while in ASYNC_WAIT state
|
||||
|
|
|
@ -260,9 +260,9 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
int l;
|
||||
synchronized (_inputQ)
|
||||
{
|
||||
// Setup blocking only if not async
|
||||
if (!isAsync())
|
||||
{
|
||||
// Setup blocking only if not async
|
||||
if (_blockUntil == 0)
|
||||
{
|
||||
long blockingTimeout = getBlockingTimeout();
|
||||
|
@ -296,7 +296,9 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
|
||||
// Consume any following poison pills
|
||||
if (item.isEmpty())
|
||||
nextInterceptedContent();
|
||||
_channelState.onRead(nextInterceptedContent()!=null);
|
||||
else
|
||||
_channelState.onRead(true);
|
||||
break;
|
||||
}
|
||||
|
||||
|
@ -305,9 +307,10 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
{
|
||||
// Not blocking, so what should we return?
|
||||
l = _state.noContent();
|
||||
_channelState.onRead(false);
|
||||
|
||||
// If EOF do we need to wake for allDataRead callback?
|
||||
if (l<0)
|
||||
// If EOF do we need to wake for allDataRead callback?
|
||||
wake = _channelState.onReadEof();
|
||||
break;
|
||||
}
|
||||
|
@ -577,7 +580,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
if (_listener == null)
|
||||
_inputQ.notify();
|
||||
else
|
||||
woken = _channelState.onDataAvailable();
|
||||
woken = _channelState.onContentAdded();
|
||||
}
|
||||
return woken;
|
||||
}
|
||||
|
@ -612,7 +615,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
if (_listener == null)
|
||||
_inputQ.notify();
|
||||
else
|
||||
woken = _channelState.onDataAvailable();
|
||||
woken = _channelState.onContentAdded();
|
||||
}
|
||||
}
|
||||
return woken;
|
||||
|
@ -800,7 +803,7 @@ public class HttpInput extends ServletInputStream implements Runnable
|
|||
if (_listener == null)
|
||||
_inputQ.notify();
|
||||
else
|
||||
woken = _channelState.onDataAvailable();
|
||||
woken = _channelState.onContentAdded();
|
||||
}
|
||||
|
||||
return woken;
|
||||
|
|
|
@ -120,9 +120,9 @@ public class HttpInputAsyncStateTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean onDataAvailable()
|
||||
public boolean onContentAdded()
|
||||
{
|
||||
boolean wake = super.onDataAvailable();
|
||||
boolean wake = super.onContentAdded();
|
||||
__history.add("onReadPossible "+wake);
|
||||
return wake;
|
||||
}
|
||||
|
|
|
@ -111,10 +111,10 @@ public class HttpInputTest
|
|||
}
|
||||
|
||||
@Override
|
||||
public boolean onDataAvailable()
|
||||
public boolean onContentAdded()
|
||||
{
|
||||
_history.add("s.onDataAvailable");
|
||||
return super.onDataAvailable();
|
||||
return super.onContentAdded();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
Loading…
Reference in New Issue