Issue #1047 - ReadPendingException and then thread death.
Alternate fix. Tracking needed vs interested state within HttpChannelState rather that ignoring duplicate interest registrations.
This commit is contained in:
parent
5920e14c9b
commit
8ecfbab79f
|
@ -92,6 +92,23 @@ public class HttpChannelState
|
|||
ERRORED // The error has been processed
|
||||
}
|
||||
|
||||
public enum Interest
|
||||
{
|
||||
NONE(false),
|
||||
NEEDED(true),
|
||||
INTERESTED(true);
|
||||
|
||||
final boolean _interested;
|
||||
boolean isInterested() { return _interested;}
|
||||
boolean notInterested() { return !_interested;}
|
||||
|
||||
Interest(boolean interest)
|
||||
{
|
||||
_interested = interest;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private final boolean DEBUG=LOG.isDebugEnabled();
|
||||
private final Locker _locker=new Locker();
|
||||
private final HttpChannel _channel;
|
||||
|
@ -101,7 +118,7 @@ public class HttpChannelState
|
|||
private Async _async;
|
||||
private boolean _initial;
|
||||
private boolean _asyncReadPossible;
|
||||
private boolean _asyncReadUnready;
|
||||
private Interest _asyncRead=Interest.NONE;
|
||||
private boolean _asyncWrite; // TODO refactor same as read
|
||||
private long _timeoutMs=DEFAULT_TIMEOUT;
|
||||
private AsyncContextEvent _event;
|
||||
|
@ -161,9 +178,15 @@ public class HttpChannelState
|
|||
{
|
||||
try(Locker.Lock lock= _locker.lock())
|
||||
{
|
||||
return String.format("%s@%x{s=%s a=%s i=%b r=%s w=%b}",getClass().getSimpleName(),hashCode(),_state,_async,_initial,
|
||||
_asyncReadPossible?(_asyncReadUnready?"PU":"P!U"):(_asyncReadUnready?"!PU":"!P!U"),
|
||||
_asyncWrite);
|
||||
return String.format("%s@%x{s=%s a=%s i=%b r=%s/%s w=%b}",
|
||||
getClass().getSimpleName(),
|
||||
hashCode(),
|
||||
_state,
|
||||
_async,
|
||||
_initial,
|
||||
_asyncRead,
|
||||
_asyncReadPossible,
|
||||
_asyncWrite);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -201,10 +224,10 @@ public class HttpChannelState
|
|||
return Action.TERMINATED;
|
||||
|
||||
case ASYNC_WOKEN:
|
||||
if (_asyncReadPossible)
|
||||
if (_asyncRead.isInterested() && _asyncReadPossible)
|
||||
{
|
||||
_state=State.ASYNC_IO;
|
||||
_asyncReadUnready=false;
|
||||
_asyncRead=Interest.NONE;
|
||||
return Action.READ_CALLBACK;
|
||||
}
|
||||
|
||||
|
@ -402,10 +425,10 @@ public class HttpChannelState
|
|||
break;
|
||||
|
||||
case STARTED:
|
||||
if (_asyncReadUnready && _asyncReadPossible)
|
||||
if (_asyncRead.isInterested() && _asyncReadPossible)
|
||||
{
|
||||
_state=State.ASYNC_IO;
|
||||
_asyncReadUnready=false;
|
||||
_asyncRead=Interest.NONE;
|
||||
action = Action.READ_CALLBACK;
|
||||
}
|
||||
else if (_asyncWrite) // TODO refactor same as read
|
||||
|
@ -418,8 +441,11 @@ public class HttpChannelState
|
|||
{
|
||||
_state=State.ASYNC_WAIT;
|
||||
action=Action.WAIT;
|
||||
if (_asyncReadUnready)
|
||||
if (_asyncRead==Interest.NEEDED)
|
||||
{
|
||||
_asyncRead=Interest.INTERESTED;
|
||||
read_interested=true;
|
||||
}
|
||||
Scheduler scheduler=_channel.getScheduler();
|
||||
if (scheduler!=null && _timeoutMs>0)
|
||||
_event.setTimeoutTask(scheduler.schedule(_event,_timeoutMs,TimeUnit.MILLISECONDS));
|
||||
|
@ -754,7 +780,8 @@ public class HttpChannelState
|
|||
_state=State.IDLE;
|
||||
_async=Async.NOT_ASYNC;
|
||||
_initial=true;
|
||||
_asyncReadPossible=_asyncReadUnready=false;
|
||||
_asyncReadPossible=false;
|
||||
_asyncRead=Interest.NONE;
|
||||
_asyncWrite=false;
|
||||
_timeoutMs=DEFAULT_TIMEOUT;
|
||||
_event=null;
|
||||
|
@ -778,7 +805,8 @@ public class HttpChannelState
|
|||
_state=State.UPGRADED;
|
||||
_async=Async.NOT_ASYNC;
|
||||
_initial=true;
|
||||
_asyncReadPossible=_asyncReadUnready=false;
|
||||
_asyncReadPossible=false;
|
||||
_asyncRead=Interest.NONE;
|
||||
_asyncWrite=false;
|
||||
_timeoutMs=DEFAULT_TIMEOUT;
|
||||
_event=null;
|
||||
|
@ -958,12 +986,16 @@ public class HttpChannelState
|
|||
try(Locker.Lock lock= _locker.lock())
|
||||
{
|
||||
// We were already unready, this is not a state change, so do nothing
|
||||
if (!_asyncReadUnready)
|
||||
if (_asyncRead!=Interest.INTERESTED)
|
||||
{
|
||||
_asyncReadUnready=true;
|
||||
_asyncReadPossible=false; // Assumes this has been checked in isReady() with lock held
|
||||
if (_state==State.ASYNC_WAIT)
|
||||
{
|
||||
interested=true;
|
||||
_asyncRead=Interest.INTERESTED;
|
||||
}
|
||||
else
|
||||
_asyncRead=Interest.NEEDED;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -984,7 +1016,7 @@ public class HttpChannelState
|
|||
try(Locker.Lock lock= _locker.lock())
|
||||
{
|
||||
_asyncReadPossible=true;
|
||||
if (_state==State.ASYNC_WAIT && _asyncReadUnready)
|
||||
if (_state==State.ASYNC_WAIT && _asyncRead.isInterested())
|
||||
{
|
||||
woken=true;
|
||||
_state=State.ASYNC_WOKEN;
|
||||
|
@ -1005,7 +1037,7 @@ public class HttpChannelState
|
|||
boolean woken=false;
|
||||
try(Locker.Lock lock= _locker.lock())
|
||||
{
|
||||
_asyncReadUnready=true;
|
||||
_asyncRead=Interest.INTERESTED;
|
||||
_asyncReadPossible=true;
|
||||
if (_state==State.ASYNC_WAIT)
|
||||
{
|
||||
|
|
|
@ -546,7 +546,7 @@ public class HttpConnection extends AbstractConnection implements Runnable, Http
|
|||
|
||||
public void asyncReadFillInterested()
|
||||
{
|
||||
getEndPoint().tryFillInterested(_asyncReadCallback);
|
||||
getEndPoint().fillInterested(_asyncReadCallback);
|
||||
}
|
||||
|
||||
public void blockingReadFillInterested()
|
||||
|
|
|
@ -980,7 +980,7 @@ public class AsyncIOServletTest extends AbstractTest
|
|||
while (input.isReady() && !input.isFinished())
|
||||
{
|
||||
int read = input.read();
|
||||
System.err.printf("%x%n", read);
|
||||
// System.err.printf("%x%n", read);
|
||||
readLatch.countDown();
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue