Fixes #8405 - onAllDataRead() is called twice under h2 if the stream … (#10174)

* Fixes #8405 - onAllDataRead() is called twice under h2 if the stream times out

Per Servlet semantic, HTTP/2 stream timeout should be ignored.

The code was trying to fail the read via `_contentDemander.onTimeout()`, but
then it was still calling `onContentProducible()`, which was returning `true`
because the state of the read was IDLE (all the request content was read) and
the request was suspended.

Now the code checks if the read was really failed; if it is not, then
`onContentProducible()` is not called and so the idle timeout is ignored.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2023-07-31 15:13:50 +02:00 committed by GitHub
parent e268917fb3
commit 87c24e7258
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 106 additions and 41 deletions

View File

@ -127,7 +127,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
boolean connect = request instanceof MetaData.ConnectRequest;
_delayedUntilContent = getHttpConfiguration().isDelayDispatchUntilContent() &&
!endStream && !_expect100Continue && !connect;
!endStream && !_expect100Continue && !connect;
// Delay the demand of DATA frames for CONNECT with :protocol
// or for normal requests expecting 100 continue.
@ -146,10 +146,10 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
{
Stream stream = getStream();
LOG.debug("HTTP2 Request #{}/{}, delayed={}:{}{} {} {}{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
_delayedUntilContent, System.lineSeparator(),
request.getMethod(), request.getURI(), request.getHttpVersion(),
System.lineSeparator(), fields);
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
_delayedUntilContent, System.lineSeparator(),
request.getMethod(), request.getURI(), request.getHttpVersion(),
System.lineSeparator(), fields);
}
return _delayedUntilContent ? null : this;
@ -179,9 +179,9 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
{
Stream stream = getStream();
LOG.debug("HTTP2 PUSH Request #{}/{}:{}{} {} {}{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(),
request.getMethod(), request.getURI(), request.getHttpVersion(),
System.lineSeparator(), request.getFields());
stream.getId(), Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(),
request.getMethod(), request.getURI(), request.getHttpVersion(),
System.lineSeparator(), request.getFields());
}
return this;
@ -222,8 +222,8 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
{
Stream stream = getStream();
LOG.debug("HTTP2 Commit Response #{}/{}:{}{} {} {}{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(), info.getHttpVersion(), info.getStatus(), info.getReason(),
System.lineSeparator(), info.getFields());
stream.getId(), Integer.toHexString(stream.getSession().hashCode()), System.lineSeparator(), info.getHttpVersion(), info.getStatus(), info.getReason(),
System.lineSeparator(), info.getFields());
}
}
@ -276,13 +276,13 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
{
Stream stream = getStream();
LOG.debug("HTTP2 Request #{}/{}: {} bytes of {} content, woken: {}, needed: {}, handle: {}",
stream.getId(),
Integer.toHexString(stream.getSession().hashCode()),
length,
endStream ? "last" : "some",
woken,
needed,
handle);
stream.getId(),
Integer.toHexString(stream.getSession().hashCode()),
length,
endStream ? "last" : "some",
woken,
needed,
handle);
}
boolean wasDelayed = _delayedUntilContent;
@ -622,8 +622,8 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
{
Stream stream = getStream();
LOG.debug("HTTP2 Request #{}/{}, trailers:{}{}",
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
System.lineSeparator(), trailers);
stream.getId(), Integer.toHexString(stream.getSession().hashCode()),
System.lineSeparator(), trailers);
}
// This will generate EOF -> need to call onContentProducible.
@ -645,7 +645,7 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
@Override
public boolean onTimeout(Throwable failure, Consumer<Runnable> consumer)
{
final boolean delayed = _delayedUntilContent;
boolean delayed = _delayedUntilContent;
_delayedUntilContent = false;
boolean reset = isIdle();
@ -655,10 +655,9 @@ public class HttpChannelOverHTTP2 extends HttpChannel implements Closeable, Writ
getHttpTransport().onStreamTimeout(failure);
failure.addSuppressed(new Throwable("HttpInput idle timeout"));
_contentDemander.onTimeout(failure);
boolean needed = getRequest().getHttpInput().onContentProducible();
if (needed || delayed)
boolean readFailed = _contentDemander.onTimeout(failure);
boolean handle = readFailed && getRequest().getHttpInput().onContentProducible();
if (handle || delayed)
{
consumer.accept(this::handleWithContext);
reset = false;

View File

@ -29,6 +29,7 @@ import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
@ -96,7 +97,9 @@ import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;
@ -1321,12 +1324,12 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
CountDownLatch clientLatch = new CountDownLatch(1);
String expected =
"S1" +
"S2" +
"S3S3" +
"S4" +
"S5" +
"S6";
"S1" +
"S2" +
"S3S3" +
"S4" +
"S5" +
"S6";
scenario.client.newRequest(scenario.newURI())
.method(HttpMethod.POST)
@ -1509,13 +1512,13 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
CountDownLatch clientLatch = new CountDownLatch(1);
String expected =
"0S" +
"1S" +
"2S" +
"3S" +
"4S" +
"5S" +
"6S";
"0S" +
"1S" +
"2S" +
"3S" +
"4S" +
"5S" +
"6S";
scenario.client.newRequest(scenario.newURI())
.method(HttpMethod.POST)
@ -1629,10 +1632,10 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
CountDownLatch clientLatch = new CountDownLatch(1);
String expected =
"0S" +
"2S" +
"4S" +
"6S";
"0S" +
"2S" +
"4S" +
"6S";
scenario.client.newRequest(scenario.newURI())
.method(HttpMethod.POST)
@ -1739,6 +1742,69 @@ public class AsyncIOServletTest extends AbstractTest<AsyncIOServletTest.AsyncTra
assertThat(failures, empty());
}
@ParameterizedTest
@ArgumentsSource(TransportProvider.class)
public void testOnAllDataReadCalledOnceThenIdleTimeout(Transport transport) throws Exception
{
init(transport);
AtomicInteger allDataReadCount = new AtomicInteger();
AtomicReference<Throwable> errorRef = new AtomicReference<>();
scenario.start(new HttpServlet()
{
@Override
protected void service(HttpServletRequest request, HttpServletResponse resp) throws IOException
{
AsyncContext asyncContext = request.startAsync();
asyncContext.setTimeout(0);
ServletInputStream input = request.getInputStream();
input.setReadListener(new ReadListener()
{
@Override
public void onDataAvailable() throws IOException
{
while (input.isReady())
{
int read = input.read();
if (read < 0)
break;
}
}
@Override
public void onAllDataRead()
{
allDataReadCount.incrementAndGet();
}
@Override
public void onError(Throwable x)
{
// There should be no errors because request body has
// been successfully read and idle timeouts are ignored.
errorRef.set(x);
}
});
// Never reply to the request, let it idle timeout.
// The Servlet semantic is that the idle timeout will
// be ignored so the client will timeout the request.
}
});
long idleTimeout = 1000;
scenario.setConnectionIdleTimeout(2 * idleTimeout);
scenario.setRequestIdleTimeout(idleTimeout);
assertThrows(TimeoutException.class, () -> scenario.client.newRequest(scenario.newURI())
.path(scenario.servletPath)
.timeout(2 * idleTimeout, TimeUnit.MILLISECONDS)
.send()
);
assertNull(errorRef.get());
assertEquals(1, allDataReadCount.get());
}
private static class Listener implements ReadListener, WriteListener
{
private final Executor executor = Executors.newFixedThreadPool(32);