Fix #6227 Async timeout dispatch race (#6228) (#6231)

Fix #6227 Async timeout dispatch race
Only allow the thread calling onTimeout to call dispatch and complete once timeout has expired.
Signed-off-by: Greg Wilkins <gregw@webtide.com>
This commit is contained in:
Greg Wilkins 2021-05-06 07:45:27 +10:00 committed by GitHub
parent 2b78094546
commit 1263732576
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 47 additions and 27 deletions

View File

@ -147,6 +147,7 @@ public class HttpChannelState
private boolean _asyncWritePossible; private boolean _asyncWritePossible;
private long _timeoutMs = DEFAULT_TIMEOUT; private long _timeoutMs = DEFAULT_TIMEOUT;
private AsyncContextEvent _event; private AsyncContextEvent _event;
private Thread _onTimeoutThread;
protected HttpChannelState(HttpChannel channel) protected HttpChannelState(HttpChannel channel)
{ {
@ -573,7 +574,10 @@ public class HttpChannelState
switch (_requestState) switch (_requestState)
{ {
case ASYNC: case ASYNC:
break;
case EXPIRING: case EXPIRING:
if (Thread.currentThread() != _onTimeoutThread)
throw new IllegalStateException(this.getStatusStringLocked());
break; break;
default: default:
throw new IllegalStateException(this.getStatusStringLocked()); throw new IllegalStateException(this.getStatusStringLocked());
@ -637,39 +641,50 @@ public class HttpChannelState
throw new IllegalStateException(toStringLocked()); throw new IllegalStateException(toStringLocked());
event = _event; event = _event;
listeners = _asyncListeners; listeners = _asyncListeners;
_onTimeoutThread = Thread.currentThread();
} }
if (listeners != null) try
{ {
Runnable task = new Runnable() if (listeners != null)
{ {
@Override Runnable task = new Runnable()
public void run()
{ {
for (AsyncListener listener : listeners) @Override
public void run()
{ {
try for (AsyncListener listener : listeners)
{ {
listener.onTimeout(event); try
} {
catch (Throwable x) listener.onTimeout(event);
{ }
if (LOG.isDebugEnabled()) catch (Throwable x)
LOG.warn("{} while invoking onTimeout listener {}", x.toString(), listener, x); {
else if (LOG.isDebugEnabled())
LOG.warn("{} while invoking onTimeout listener {}", x.toString(), listener); LOG.warn("{} while invoking onTimeout listener {}", x.toString(), listener, x);
else
LOG.warn("{} while invoking onTimeout listener {}", x.toString(), listener);
}
} }
} }
}
@Override @Override
public String toString() public String toString()
{ {
return "onTimeout"; return "onTimeout";
} }
}; };
runInContext(event, task); runInContext(event, task);
}
}
finally
{
synchronized (this)
{
_onTimeoutThread = null;
}
} }
} }
@ -686,6 +701,11 @@ public class HttpChannelState
switch (_requestState) switch (_requestState)
{ {
case EXPIRING: case EXPIRING:
if (Thread.currentThread() != _onTimeoutThread)
throw new IllegalStateException(this.getStatusStringLocked());
_requestState = _sendError ? RequestState.BLOCKING : RequestState.COMPLETE;
break;
case ASYNC: case ASYNC:
_requestState = _sendError ? RequestState.BLOCKING : RequestState.COMPLETE; _requestState = _sendError ? RequestState.BLOCKING : RequestState.COMPLETE;
break; break;

View File

@ -230,8 +230,8 @@ public class QoSFilter implements Filter
} }
catch (IllegalStateException x) catch (IllegalStateException x)
{ {
LOG.warn("Unable to resume suspended dispatch", x); if (LOG.isDebugEnabled())
continue; LOG.debug("dispatch failed", x);
} }
} }
} }
@ -356,12 +356,12 @@ public class QoSFilter implements Filter
} }
@Override @Override
public void onStartAsync(AsyncEvent event) throws IOException public void onStartAsync(AsyncEvent event)
{ {
} }
@Override @Override
public void onComplete(AsyncEvent event) throws IOException public void onComplete(AsyncEvent event)
{ {
} }
@ -377,7 +377,7 @@ public class QoSFilter implements Filter
} }
@Override @Override
public void onError(AsyncEvent event) throws IOException public void onError(AsyncEvent event)
{ {
} }
} }