Issue #1759 - HTTP/2 producer can block in onReset.

Now both failures and timeouts, when they must call the application,
do so by dispatching a Runnable to avoid to block the caller thread.
This commit is contained in:
Simone Bordet 2017-08-21 15:53:56 +02:00
parent f7925aebd2
commit 154824049b
2 changed files with 39 additions and 17 deletions

View File

@ -156,7 +156,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
public boolean onStreamTimeout(IStream stream, Throwable failure) public boolean onStreamTimeout(IStream stream, Throwable failure)
{ {
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
boolean result = channel != null && channel.onStreamTimeout(failure); boolean result = channel != null && channel.onStreamTimeout(failure, task -> offerTask(task, true));
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", stream, failure); LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", stream, failure);
return result; return result;
@ -168,7 +168,11 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
LOG.debug("Processing failure on {}: {}", stream, failure); LOG.debug("Processing failure on {}: {}", stream, failure);
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
if (channel != null) if (channel != null)
channel.onFailure(failure); {
Runnable task = channel.onFailure(failure);
if (task != null)
offerTask(task, true);
}
} }
public boolean onSessionTimeout(Throwable failure) public boolean onSessionTimeout(Throwable failure)
@ -179,7 +183,7 @@ public class HTTP2ServerConnection extends HTTP2Connection implements Connection
{ {
HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE); HttpChannelOverHTTP2 channel = (HttpChannelOverHTTP2)stream.getAttribute(IStream.CHANNEL_ATTRIBUTE);
if (channel != null) if (channel != null)
result &= !channel.isRequestExecuting(); result &= channel.isRequestIdle();
} }
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", session, failure); LOG.debug("{} idle timeout on {}: {}", result ? "Processed" : "Ignored", session, failure);

View File

@ -20,6 +20,7 @@ package org.eclipse.jetty.http2.server;
import java.io.IOException; import java.io.IOException;
import java.nio.ByteBuffer; import java.nio.ByteBuffer;
import java.util.function.Consumer;
import org.eclipse.jetty.http.BadMessageException; import org.eclipse.jetty.http.BadMessageException;
import org.eclipse.jetty.http.HttpField; import org.eclipse.jetty.http.HttpField;
@ -40,6 +41,7 @@ import org.eclipse.jetty.server.Connector;
import org.eclipse.jetty.server.HttpChannel; import org.eclipse.jetty.server.HttpChannel;
import org.eclipse.jetty.server.HttpConfiguration; import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.HttpInput; import org.eclipse.jetty.server.HttpInput;
import org.eclipse.jetty.server.handler.ContextHandler;
import org.eclipse.jetty.util.BufferUtil; import org.eclipse.jetty.util.BufferUtil;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Log;
@ -277,32 +279,39 @@ public class HttpChannelOverHTTP2 extends HttpChannel
return handle || wasDelayed ? this : null; return handle || wasDelayed ? this : null;
} }
public boolean isRequestExecuting() public boolean isRequestIdle()
{ {
return !getState().isIdle(); return getState().isIdle();
} }
public boolean onStreamTimeout(Throwable failure) public boolean onStreamTimeout(Throwable failure, Consumer<Runnable> consumer)
{ {
boolean result = false;
if (isRequestIdle())
{
consumeInput();
result = true;
}
getHttpTransport().onStreamTimeout(failure); getHttpTransport().onStreamTimeout(failure);
if (getRequest().getHttpInput().onIdleTimeout(failure)) if (getRequest().getHttpInput().onIdleTimeout(failure))
handle(); consumer.accept(this::handleWithContext);
if (isRequestExecuting()) return result;
return false;
consumeInput();
return true;
} }
public void onFailure(Throwable failure) public Runnable onFailure(Throwable failure)
{ {
getHttpTransport().onStreamFailure(failure); getHttpTransport().onStreamFailure(failure);
if (getRequest().getHttpInput().failed(failure)) boolean handle = getRequest().getHttpInput().failed(failure);
handle();
else
getState().asyncError(failure);
consumeInput(); consumeInput();
return () ->
{
if (handle)
handleWithContext();
else
getState().asyncError(failure);
};
} }
protected void consumeInput() protected void consumeInput()
@ -310,6 +319,15 @@ public class HttpChannelOverHTTP2 extends HttpChannel
getRequest().getHttpInput().consumeAll(); getRequest().getHttpInput().consumeAll();
} }
private void handleWithContext()
{
ContextHandler context = getState().getContextHandler();
if (context != null)
context.handle(getRequest(), this);
else
handle();
}
/** /**
* If the associated response has the Expect header set to 100 Continue, * If the associated response has the Expect header set to 100 Continue,
* then accessing the input stream indicates that the handler/servlet * then accessing the input stream indicates that the handler/servlet