Fix #5605 write side

refactored the complete method to consider unrecoverable API states no matter what the httpout state
actually is.  This avoid duplication of OPEN, CLOSING, CLOSED etc. handling.
This commit is contained in:
gregw 2021-02-10 15:35:48 +01:00
parent ed534b84ef
commit 9f2a4f5ad5
2 changed files with 70 additions and 47 deletions

View File

@ -412,6 +412,37 @@ public class HttpOutput extends ServletOutputStream implements Runnable
ByteBuffer content = null;
synchronized (_channelState)
{
// First check the API state for any unrecoverable situations
switch (_apiState)
{
case UNREADY: // isReady() has returned false so a call to onWritePossible may happen at any time
error = new CancellationException("Completed whilst write unready");
break;
case PENDING: // an async write is pending and may complete at any time
// If this is not the last write, then we must abort
if (!_channel.getResponse().isContentComplete(_written))
error = new CancellationException("Completed whilst write pending");
break;
case BLOCKED: // another thread is blocked in a write or a close
error = new CancellationException("Completed whilst write blocked");
break;
default:
break;
}
// If we can't complete due to the API state, then abort
if (error != null)
{
_writeBlocker.fail(error);
_channel.abort(error);
_state = State.CLOSED;
}
else
{
// Otherwise check the output state to determine how to complete
switch (_state)
{
case CLOSED:
@ -449,30 +480,21 @@ public class HttpOutput extends ServletOutputStream implements Runnable
content = BufferUtil.hasContent(_aggregate) ? _aggregate : BufferUtil.EMPTY_BUFFER;
break;
case BLOCKED:
case UNREADY:
case PENDING:
LOG.warn("Pending write in complete {} {}", this, _channel);
// An operation is in progress, so we soft close now
_softClose = true;
// then trigger a close from onWriteComplete
_state = State.CLOSE;
// But if we are blocked or there is more content to come, we must abort
// Note that this allows a pending async write to complete only if it is the last write
if (_apiState == ApiState.BLOCKED || !_channel.getResponse().isContentComplete(_written))
{
CancellationException cancelled = new CancellationException();
_writeBlocker.fail(cancelled);
_channel.abort(cancelled);
_state = State.CLOSED;
}
break;
default:
throw new IllegalStateException();
}
break;
}
}
}
if (LOG.isDebugEnabled())
LOG.debug("complete({}) {} s={} e={}, c={}", callback, stateString(), succeeded, error, BufferUtil.toDetailString(content));

View File

@ -114,7 +114,8 @@ public class BlockedIOTest extends AbstractTest<TransportScenario>
DeferredContentProvider contentProvider = new DeferredContentProvider();
CountDownLatch ok = new CountDownLatch(2);
scenario.client.POST(scenario.newURI())
scenario.client.newRequest(scenario.newURI())
.method("POST")
.content(contentProvider)
.onResponseContent((response, content) ->
{