Issue #5691 - HttpInput may skip setting fill interest.

Code cleanups and logging improvements.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2020-11-20 16:50:24 +01:00
parent 7726c2ebcb
commit 8edb5cfc24
4 changed files with 76 additions and 63 deletions

View File

@ -175,6 +175,8 @@ public class ByteArrayEndPoint extends AbstractEndPoint
throw new ClosedChannelException();
ByteBuffer in = _inQ.peek();
if (LOG.isDebugEnabled())
LOG.debug("{} needsFillInterest EOF={} {}", this, in == EOF, BufferUtil.toDetailString(in));
if (BufferUtil.hasContent(in) || isEOF(in))
execute(_runFillable);
}
@ -201,11 +203,15 @@ public class ByteArrayEndPoint extends AbstractEndPoint
boolean wasEmpty = _inQ.isEmpty();
if (in == null)
{
if (LOG.isDebugEnabled())
LOG.debug("{} addEOFAndRun=true", this);
_inQ.add(EOF);
fillable = true;
}
if (BufferUtil.hasContent(in))
{
if (LOG.isDebugEnabled())
LOG.debug("{} addInputAndRun={} {}", this, wasEmpty, BufferUtil.toDetailString(in));
_inQ.add(in);
fillable = wasEmpty;
}
@ -234,11 +240,15 @@ public class ByteArrayEndPoint extends AbstractEndPoint
boolean wasEmpty = _inQ.isEmpty();
if (in == null)
{
if (LOG.isDebugEnabled())
LOG.debug("{} addEOFAndExecute=true", this);
_inQ.add(EOF);
fillable = true;
}
if (BufferUtil.hasContent(in))
{
if (LOG.isDebugEnabled())
LOG.debug("{} addInputAndExecute={} {}", this, wasEmpty, BufferUtil.toDetailString(in));
_inQ.add(in);
fillable = wasEmpty;
}

View File

@ -77,7 +77,7 @@ class AsyncContentProducer implements ContentProducer
HttpInput.Content content = nextTransformedContent();
int available = content == null ? 0 : content.remaining();
if (LOG.isDebugEnabled())
LOG.debug("available = {}", available);
LOG.debug("available = {} {}", available, this);
return available;
}
@ -86,7 +86,7 @@ class AsyncContentProducer implements ContentProducer
{
boolean hasContent = _rawContent != null;
if (LOG.isDebugEnabled())
LOG.debug("hasContent = {}", hasContent);
LOG.debug("hasContent = {} {}", hasContent, this);
return hasContent;
}
@ -94,7 +94,7 @@ class AsyncContentProducer implements ContentProducer
public boolean isError()
{
if (LOG.isDebugEnabled())
LOG.debug("isError = {}", _error);
LOG.debug("isError = {} {}", _error, this);
return _error;
}
@ -103,7 +103,7 @@ class AsyncContentProducer implements ContentProducer
{
long minRequestDataRate = _httpChannel.getHttpConfiguration().getMinRequestDataRate();
if (LOG.isDebugEnabled())
LOG.debug("checkMinDataRate [m={},t={}]", minRequestDataRate, _firstByteTimeStamp);
LOG.debug("checkMinDataRate [m={},t={}] {}", minRequestDataRate, _firstByteTimeStamp, this);
if (minRequestDataRate > 0 && _firstByteTimeStamp != Long.MIN_VALUE)
{
long period = System.nanoTime() - _firstByteTimeStamp;
@ -113,13 +113,13 @@ class AsyncContentProducer implements ContentProducer
if (getRawContentArrived() < minimumData)
{
if (LOG.isDebugEnabled())
LOG.debug("checkMinDataRate check failed");
LOG.debug("checkMinDataRate check failed {}", this);
BadMessageException bad = new BadMessageException(HttpStatus.REQUEST_TIMEOUT_408,
String.format("Request content data rate < %d B/s", minRequestDataRate));
if (_httpChannel.getState().isResponseCommitted())
{
if (LOG.isDebugEnabled())
LOG.debug("checkMinDataRate aborting channel");
LOG.debug("checkMinDataRate aborting channel {}", this);
_httpChannel.abort(bad);
}
failCurrentContent(bad);
@ -133,7 +133,7 @@ class AsyncContentProducer implements ContentProducer
public long getRawContentArrived()
{
if (LOG.isDebugEnabled())
LOG.debug("getRawContentArrived = {}", _rawContentArrived);
LOG.debug("getRawContentArrived = {} {}", _rawContentArrived, this);
return _rawContentArrived;
}
@ -141,7 +141,7 @@ class AsyncContentProducer implements ContentProducer
public boolean consumeAll(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("consumeAll [e={}]", (Object)x);
LOG.debug("consumeAll [e={}] {}", x, this);
failCurrentContent(x);
// A specific HttpChannel mechanism must be used as the following code
// does not guarantee that the channel will synchronously deliver all
@ -156,14 +156,14 @@ class AsyncContentProducer implements ContentProducer
// deliver the content asynchronously. Tests in StreamResetTest cover this.
boolean atEof = _httpChannel.failAllContent(x);
if (LOG.isDebugEnabled())
LOG.debug("failed all content of http channel; at EOF? {}", atEof);
LOG.debug("failed all content of http channel EOF={} {}", atEof, this);
return atEof;
}
private void failCurrentContent(Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("failing currently held content [r={},t={}]", _rawContent, _transformedContent, x);
LOG.debug("failing currently held content {}", this, x);
if (_transformedContent != null && !_transformedContent.isSpecial())
{
if (_transformedContent != _rawContent)
@ -186,7 +186,7 @@ class AsyncContentProducer implements ContentProducer
public boolean onContentProducible()
{
if (LOG.isDebugEnabled())
LOG.debug("onContentProducible");
LOG.debug("onContentProducible {}", this);
return _httpChannel.getState().onReadReady();
}
@ -195,7 +195,7 @@ class AsyncContentProducer implements ContentProducer
{
HttpInput.Content content = nextTransformedContent();
if (LOG.isDebugEnabled())
LOG.debug("nextContent = {}", content);
LOG.debug("nextContent = {} {}", content, this);
if (content != null)
_httpChannel.getState().onReadIdle();
return content;
@ -205,7 +205,7 @@ class AsyncContentProducer implements ContentProducer
public void reclaim(HttpInput.Content content)
{
if (LOG.isDebugEnabled())
LOG.debug("reclaim {} [t={}]", content, _transformedContent);
LOG.debug("reclaim {} {}", content, this);
if (_transformedContent == content)
{
content.succeeded();
@ -239,7 +239,7 @@ class AsyncContentProducer implements ContentProducer
else
{
if (LOG.isDebugEnabled())
LOG.debug("isReady got transformed content {}", content);
LOG.debug("isReady got transformed content {} {}", content, this);
_httpChannel.getState().onContentAdded();
}
boolean ready = content != null;
@ -251,7 +251,7 @@ class AsyncContentProducer implements ContentProducer
private HttpInput.Content nextTransformedContent()
{
if (LOG.isDebugEnabled())
LOG.debug("nextTransformedContent [r={},t={}]", _rawContent, _transformedContent);
LOG.debug("nextTransformedContent {}", this);
if (_rawContent == null)
{
_rawContent = produceRawContent();
@ -264,7 +264,7 @@ class AsyncContentProducer implements ContentProducer
if (_transformedContent != _rawContent)
_transformedContent.succeeded();
if (LOG.isDebugEnabled())
LOG.debug("nulling depleted transformed content");
LOG.debug("nulling depleted transformed content {}", this);
_transformedContent = null;
}
@ -276,20 +276,20 @@ class AsyncContentProducer implements ContentProducer
_error = _rawContent.getError() != null;
if (LOG.isDebugEnabled())
LOG.debug("raw content is special (with error = {}), returning it", _error);
LOG.debug("raw content is special (with error = {}), returning it {}", _error, this);
return _rawContent;
}
if (_interceptor != null)
{
if (LOG.isDebugEnabled())
LOG.debug("using interceptor {} to transform raw content", _interceptor);
LOG.debug("using interceptor to transform raw content {}", this);
_transformedContent = _interceptor.readFrom(_rawContent);
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("null interceptor, transformed content = raw content");
LOG.debug("null interceptor, transformed content = raw content {}", this);
_transformedContent = _rawContent;
}
@ -298,7 +298,7 @@ class AsyncContentProducer implements ContentProducer
if (_transformedContent != _rawContent)
_transformedContent.succeeded();
if (LOG.isDebugEnabled())
LOG.debug("nulling depleted transformed content");
LOG.debug("nulling depleted transformed content {}", this);
_transformedContent = null;
}
@ -309,30 +309,30 @@ class AsyncContentProducer implements ContentProducer
_rawContent.succeeded();
_rawContent = null;
if (LOG.isDebugEnabled())
LOG.debug("nulling depleted raw content");
LOG.debug("nulling depleted raw content {}", this);
_rawContent = produceRawContent();
if (_rawContent == null)
{
if (LOG.isDebugEnabled())
LOG.debug("produced null raw content, returning null");
LOG.debug("produced null raw content, returning null, {}", this);
return null;
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("raw content is not empty");
LOG.debug("raw content is not empty {}", this);
}
}
else
{
if (LOG.isDebugEnabled())
LOG.debug("transformed content is not empty");
LOG.debug("transformed content is not empty {}", this);
}
}
if (LOG.isDebugEnabled())
LOG.debug("returning transformed content {}", _transformedContent);
LOG.debug("returning transformed content {}", this);
return _transformedContent;
}
@ -345,10 +345,24 @@ class AsyncContentProducer implements ContentProducer
if (_firstByteTimeStamp == Long.MIN_VALUE)
_firstByteTimeStamp = System.nanoTime();
if (LOG.isDebugEnabled())
LOG.debug("produceRawContent updated rawContentArrived to {} and firstByteTimeStamp to {}", _rawContentArrived, _firstByteTimeStamp);
LOG.debug("produceRawContent updated rawContentArrived to {} and firstByteTimeStamp to {} {}", _rawContentArrived, _firstByteTimeStamp, this);
}
if (LOG.isDebugEnabled())
LOG.debug("produceRawContent produced {}", content);
LOG.debug("produceRawContent produced {} {}", content, this);
return content;
}
@Override
public String toString()
{
return String.format("%s@%x[r=%s,t=%s,i=%s,error=%b,c=%s]",
getClass().getSimpleName(),
hashCode(),
_rawContent,
_transformedContent,
_interceptor,
_error,
_httpChannel
);
}
}

View File

@ -83,7 +83,7 @@ public class HttpInput extends ServletInputStream implements Runnable
public void setInterceptor(Interceptor interceptor)
{
if (LOG.isDebugEnabled())
LOG.debug("setting interceptor to {}", interceptor);
LOG.debug("setting interceptor to {} on {}", interceptor, this);
_contentProducer.setInterceptor(interceptor);
}
@ -99,14 +99,14 @@ public class HttpInput extends ServletInputStream implements Runnable
if (currentInterceptor == null)
{
if (LOG.isDebugEnabled())
LOG.debug("adding single interceptor: {}", interceptor);
LOG.debug("adding single interceptor: {} on {}", interceptor, this);
_contentProducer.setInterceptor(interceptor);
}
else
{
ChainedInterceptor chainedInterceptor = new ChainedInterceptor(currentInterceptor, interceptor);
if (LOG.isDebugEnabled())
LOG.debug("adding chained interceptor: {}", chainedInterceptor);
LOG.debug("adding chained interceptor: {} on {}", chainedInterceptor, this);
_contentProducer.setInterceptor(chainedInterceptor);
}
}
@ -131,7 +131,7 @@ public class HttpInput extends ServletInputStream implements Runnable
public boolean consumeAll()
{
if (LOG.isDebugEnabled())
LOG.debug("consume all");
LOG.debug("consumeAll {}", this);
boolean atEof = _contentProducer.consumeAll(new IOException("Unconsumed content"));
if (atEof)
_consumedEof = true;
@ -146,14 +146,14 @@ public class HttpInput extends ServletInputStream implements Runnable
{
boolean error = _contentProducer.isError();
if (LOG.isDebugEnabled())
LOG.debug("isError = {}", error);
LOG.debug("isError={} {}", error, this);
return error;
}
public boolean isAsync()
{
if (LOG.isDebugEnabled())
LOG.debug("isAsync read listener = " + _readListener);
LOG.debug("isAsync read listener {} {}", _readListener, this);
return _readListener != null;
}
@ -164,7 +164,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{
boolean finished = _consumedEof;
if (LOG.isDebugEnabled())
LOG.debug("isFinished? {}", finished);
LOG.debug("isFinished={} {}", finished, this);
return finished;
}
@ -172,23 +172,16 @@ public class HttpInput extends ServletInputStream implements Runnable
public boolean isReady()
{
boolean ready = _contentProducer.isReady();
if (!ready)
{
if (LOG.isDebugEnabled())
LOG.debug("isReady? false");
return false;
}
if (LOG.isDebugEnabled())
LOG.debug("isReady? true");
return true;
LOG.debug("isReady={} {}", ready, this);
return ready;
}
@Override
public void setReadListener(ReadListener readListener)
{
if (LOG.isDebugEnabled())
LOG.debug("setting read listener to {}", readListener);
LOG.debug("setting read listener to {} {}", readListener, this);
if (_readListener != null)
throw new IllegalStateException("ReadListener already set");
_readListener = Objects.requireNonNull(readListener);
@ -229,7 +222,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{
int read = get(content, b, off, len);
if (LOG.isDebugEnabled())
LOG.debug("read produced {} byte(s)", read);
LOG.debug("read produced {} byte(s) {}", read, this);
if (content.isEmpty())
_contentProducer.reclaim(content);
return read;
@ -237,7 +230,7 @@ public class HttpInput extends ServletInputStream implements Runnable
Throwable error = content.getError();
if (LOG.isDebugEnabled())
LOG.debug("read error = " + error);
LOG.debug("read error={} {}", error, this);
if (error != null)
{
if (error instanceof IOException)
@ -248,7 +241,7 @@ public class HttpInput extends ServletInputStream implements Runnable
if (content.isEof())
{
if (LOG.isDebugEnabled())
LOG.debug("read at EOF, setting consumed EOF to true");
LOG.debug("read at EOF, setting consumed EOF to true {}", this);
_consumedEof = true;
// If EOF do we need to wake for allDataRead callback?
if (onContentProducible())
@ -276,7 +269,7 @@ public class HttpInput extends ServletInputStream implements Runnable
// which is forbidden by this method's contract.
boolean hasContent = _contentProducer.hasContent();
if (LOG.isDebugEnabled())
LOG.debug("hasContent = {}", hasContent);
LOG.debug("hasContent={} {}", hasContent, this);
return hasContent;
}
@ -285,7 +278,7 @@ public class HttpInput extends ServletInputStream implements Runnable
{
int available = _contentProducer.available();
if (LOG.isDebugEnabled())
LOG.debug("available = {}", available);
LOG.debug("available={} {}", available, this);
return available;
}
@ -300,17 +293,13 @@ public class HttpInput extends ServletInputStream implements Runnable
{
Content content = _contentProducer.nextContent();
if (LOG.isDebugEnabled())
LOG.debug("running on content {}", content);
// The nextContent() call could return null if the transformer ate all
// the raw bytes without producing any transformed content.
if (content == null)
return;
LOG.debug("running on content {} {}", content, this);
// This check is needed when a request is started async but no read listener is registered.
if (_readListener == null)
{
if (LOG.isDebugEnabled())
LOG.debug("running without a read listener");
LOG.debug("running without a read listener {}", this);
onContentProducible();
return;
}
@ -321,7 +310,7 @@ public class HttpInput extends ServletInputStream implements Runnable
if (error != null)
{
if (LOG.isDebugEnabled())
LOG.debug("running has error: {}", (Object)error);
LOG.debug("running error={} {}", error, this);
// TODO is this necessary to add here?
_channelState.getHttpChannel().getResponse().getHttpFields().add(HttpConnection.CONNECTION_CLOSE);
_readListener.onError(error);
@ -331,13 +320,13 @@ public class HttpInput extends ServletInputStream implements Runnable
try
{
if (LOG.isDebugEnabled())
LOG.debug("running at EOF");
LOG.debug("running at EOF {}", this);
_readListener.onAllDataRead();
}
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("running failed onAllDataRead", x);
LOG.debug("running failed onAllDataRead {}", this, x);
_readListener.onError(x);
}
}
@ -345,7 +334,7 @@ public class HttpInput extends ServletInputStream implements Runnable
else
{
if (LOG.isDebugEnabled())
LOG.debug("running has content");
LOG.debug("running has content {}", this);
try
{
_readListener.onDataAvailable();
@ -353,7 +342,7 @@ public class HttpInput extends ServletInputStream implements Runnable
catch (Throwable x)
{
if (LOG.isDebugEnabled())
LOG.debug("running failed onDataAvailable", x);
LOG.debug("running failed onDataAvailable {}", this, x);
_readListener.onError(x);
}
}

View File

@ -90,7 +90,7 @@ public class HttpInputIntegrationTest
__server.addConnector(local);
ServerConnector http = new ServerConnector(__server, new HttpConnectionFactory(__config), new HTTP2CServerConnectionFactory(__config));
http.setIdleTimeout(4000);
http.setIdleTimeout(5000);
__server.addConnector(http);
// SSL Context Factory for HTTPS and HTTP/2
@ -119,7 +119,7 @@ public class HttpInputIntegrationTest
// HTTP/2 Connector
ServerConnector http2 = new ServerConnector(__server, ssl,/*TODO alpn,h2,*/ h1);
http2.setIdleTimeout(4000);
http2.setIdleTimeout(5000);
__server.addConnector(http2);
ServletContextHandler context = new ServletContextHandler(__server, "/ctx");
@ -336,7 +336,7 @@ public class HttpInputIntegrationTest
for (int i = 0; i < threads; i++)
{
t[i] = new Thread(run);
t[i] = new Thread(run, "client-" + i);
t[i].start();
}
for (int i = 0; i < threads; i++)