Merge remote-tracking branch 'origin/jetty-9.4.x-5605-wakeup-blocked-threads' into jetty-10.0.x-5605-wakeup-blocked-threads

This commit is contained in:
Ludovic Orban 2021-02-10 15:53:38 +01:00
commit 03e2789699
2 changed files with 70 additions and 47 deletions

View File

@ -397,65 +397,87 @@ public class HttpOutput extends ServletOutputStream implements Runnable
ByteBuffer content = null; ByteBuffer content = null;
try (AutoLock l = _channelState.lock()) try (AutoLock l = _channelState.lock())
{ {
switch (_state) // First check the API state for any unrecoverable situations
switch (_apiState)
{ {
case CLOSED: case UNREADY: // isReady() has returned false so a call to onWritePossible may happen at any time
succeeded = true; error = new CancellationException("Completed whilst write unready");
break; break;
case CLOSE: case PENDING: // an async write is pending and may complete at any time
case CLOSING: // If this is not the last write, then we must abort
_closedCallback = Callback.combine(_closedCallback, callback); if (!_channel.getResponse().isContentComplete(_written))
error = new CancellationException("Completed whilst write pending");
break; break;
case OPEN: case BLOCKED: // another thread is blocked in a write or a close
if (_onError != null) error = new CancellationException("Completed whilst write blocked");
{ break;
error = _onError;
default:
break;
}
// If we can't complete due to the API state, then abort
if (error != null)
{
_writeBlocker.fail(error);
_channel.abort(error);
_state = State.CLOSED;
}
else
{
// Otherwise check the output state to determine how to complete
switch (_state)
{
case CLOSED:
succeeded = true;
break; break;
}
_closedCallback = Callback.combine(_closedCallback, callback); case CLOSE:
case CLOSING:
_closedCallback = Callback.combine(_closedCallback, callback);
break;
switch (_apiState) case OPEN:
{ if (_onError != null)
case BLOCKING: {
// Output is idle blocking state, but we still do an async close error = _onError;
_apiState = ApiState.BLOCKED;
_state = State.CLOSING;
content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
break; break;
}
case ASYNC: _closedCallback = Callback.combine(_closedCallback, callback);
case READY:
// Output is idle in async state, so we can do an async close
_apiState = ApiState.PENDING;
_state = State.CLOSING;
content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
break;
case BLOCKED: switch (_apiState)
case UNREADY: {
case PENDING: case BLOCKING:
LOG.warn("Pending write in complete {} {}", this, _channel); // Output is idle blocking state, but we still do an async close
// An operation is in progress, so we soft close now _apiState = ApiState.BLOCKED;
_softClose = true; _state = State.CLOSING;
// then trigger a close from onWriteComplete content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
_state = State.CLOSE; break;
// But if we are blocked or there is more content to come, we must abort case ASYNC:
// Note that this allows a pending async write to complete only if it is the last write case READY:
if (_apiState == ApiState.BLOCKED || !_channel.getResponse().isContentComplete(_written)) // Output is idle in async state, so we can do an async close
{ _apiState = ApiState.PENDING;
CancellationException cancelled = new CancellationException(); _state = State.CLOSING;
_writeBlocker.fail(cancelled); content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
_channel.abort(cancelled); break;
_state = State.CLOSED;
}
break; case UNREADY:
} case PENDING:
break; // An operation is in progress, so we soft close now
_softClose = true;
// then trigger a close from onWriteComplete
_state = State.CLOSE;
break;
default:
throw new IllegalStateException();
}
break;
}
} }
} }

View File

@ -109,7 +109,8 @@ public class BlockedIOTest extends AbstractTest<TransportScenario>
DeferredContentProvider contentProvider = new DeferredContentProvider(); DeferredContentProvider contentProvider = new DeferredContentProvider();
CountDownLatch ok = new CountDownLatch(2); CountDownLatch ok = new CountDownLatch(2);
scenario.client.POST(scenario.newURI()) scenario.client.newRequest(scenario.newURI())
.method("POST")
.content(contentProvider) .content(contentProvider)
.onResponseContent((response, content) -> .onResponseContent((response, content) ->
{ {