diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/IdleTimeoutTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/IdleTimeoutTest.java index 059a07ace9b..6c984dcae52 100644 --- a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/IdleTimeoutTest.java +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/IdleTimeoutTest.java @@ -26,6 +26,7 @@ import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import javax.servlet.ServletException; +import javax.servlet.ServletInputStream; import javax.servlet.http.HttpServlet; import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; @@ -33,6 +34,7 @@ import javax.servlet.http.HttpServletResponse; import org.eclipse.jetty.http.HttpFields; import org.eclipse.jetty.http.HttpVersion; import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.FlowControlStrategy; import org.eclipse.jetty.http2.HTTP2Session; import org.eclipse.jetty.http2.api.Session; import org.eclipse.jetty.http2.api.Stream; @@ -44,12 +46,11 @@ import org.eclipse.jetty.http2.frames.ResetFrame; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.FuturePromise; import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.log.Log; +import org.hamcrest.Matchers; import org.junit.Assert; import org.junit.Test; -import static org.hamcrest.core.IsInstanceOf.instanceOf; -import static org.junit.Assert.assertThat; - public class IdleTimeoutTest extends AbstractTest { private final int idleTimeout = 1000; @@ -364,7 +365,7 @@ public class IdleTimeoutTest extends AbstractTest @Override public void onTimeout(Stream stream, Throwable x) { - assertThat(x, instanceOf(TimeoutException.class)); + Assert.assertThat(x, Matchers.instanceOf(TimeoutException.class)); timeoutLatch.countDown(); } }); @@ -531,6 +532,55 @@ public class IdleTimeoutTest extends AbstractTest Assert.assertFalse(resetLatch.await(0, TimeUnit.SECONDS)); } + @Test + public void testBufferedReadsResetStreamIdleTimeout() throws Exception + { + long delay = 1000; + start(new HttpServlet() + { + @Override + protected void service(HttpServletRequest request, HttpServletResponse response) throws ServletException, IOException + { + ServletInputStream input = request.getInputStream(); + byte[] buffer = new byte[8192]; + while (true) + { + int read = input.read(buffer); + Log.getLogger(IdleTimeoutTest.class).info("Read {} bytes", read); + if (read < 0) + break; + sleep(delay); + } + } + }); + connector.setIdleTimeout(2 * delay); + + Session session = newClient(new Session.Listener.Adapter()); + MetaData.Request metaData = newRequest("POST", new HttpFields()); + HeadersFrame requestFrame = new HeadersFrame(metaData, null, false); + FuturePromise promise = new FuturePromise<>(); + CountDownLatch latch = new CountDownLatch(1); + session.newStream(requestFrame, promise, new Stream.Listener.Adapter() + { + @Override + public void onHeaders(Stream stream, HeadersFrame frame) + { + if (frame.isEndStream()) + latch.countDown(); + } + }); + Stream stream = promise.get(5, TimeUnit.SECONDS); + + // Send data larger than the flow control window. + // The client will send bytes up to the flow control window immediately + // and they will be buffered by the server, which will read them slowly. + // Server reads should reset the idle timeout. + ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE + 1); + stream.data(new DataFrame(stream.getId(), data, true), Callback.NOOP); + + Assert.assertTrue(latch.await(555, TimeUnit.SECONDS)); + } + private void sleep(long value) { try diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java index 58d812e88b6..2b2c640b6fe 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java @@ -28,6 +28,7 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; @@ -84,6 +85,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio private int maxRemoteStreams; private long streamIdleTimeout; private boolean pushEnabled; + private long idleTime; public HTTP2Session(Scheduler scheduler, EndPoint endPoint, Generator generator, Session.Listener listener, FlowControlStrategy flowControl, int initialStreamId) { @@ -100,6 +102,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio this.sendWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE); this.recvWindow.set(FlowControlStrategy.DEFAULT_WINDOW_SIZE); this.pushEnabled = true; // SPEC: by default, push is enabled. + this.idleTime = System.nanoTime(); } @Override @@ -183,7 +186,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio @Override public void succeeded() { - flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength); + complete(); } @Override @@ -191,6 +194,13 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio { // Consume also in case of failures, to free the // session flow control window for other streams. + complete(); + } + + private void complete() + { + notIdle(); + stream.notIdle(); flowControl.onDataConsumed(HTTP2Session.this, stream, flowControlLength); } }); @@ -408,7 +418,10 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio { IStream stream = getStream(streamId); if (stream != null) + { + stream.process(frame, Callback.NOOP); onWindowUpdate(stream, frame); + } } else { @@ -619,7 +632,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio // Ping frames are prepended to process them as soon as possible. boolean queued = entry.frame.getType() == FrameType.PING ? flusher.prepend(entry) : flusher.append(entry); if (queued && flush) + { + if (entry.stream != null) + entry.stream.notIdle(); flusher.iterate(); + } } protected IStream createLocalStream(int streamId, Promise promise) @@ -866,6 +883,9 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio { case NOT_CLOSED: { + long elapsed = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - idleTime); + if (elapsed < endPoint.getIdleTimeout()) + return false; return notifyIdleTimeout(this); } case LOCALLY_CLOSED: @@ -881,6 +901,11 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio } } + private void notIdle() + { + idleTime = System.nanoTime(); + } + @Override public void onFrame(Frame frame) { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index e6a9c4a2308..a9ed27fa67b 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -33,6 +33,7 @@ import org.eclipse.jetty.http2.frames.Frame; import org.eclipse.jetty.http2.frames.HeadersFrame; import org.eclipse.jetty.http2.frames.PushPromiseFrame; import org.eclipse.jetty.http2.frames.ResetFrame; +import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.io.IdleTimeout; import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Promise; @@ -87,14 +88,12 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback { if (!checkWrite(callback)) return; - notIdle(); session.frames(this, this, frame, Frame.EMPTY_ARRAY); } @Override public void push(PushPromiseFrame frame, Promise promise, Listener listener) { - notIdle(); session.push(this, promise, frame, listener); } @@ -103,7 +102,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback { if (!checkWrite(callback)) return; - notIdle(); session.data(this, this, frame); } @@ -112,7 +110,6 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback { if (isReset()) return; - notIdle(); localReset = true; session.frames(this, callback, frame, Frame.EMPTY_ARRAY); } @@ -240,6 +237,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback onPush((PushPromiseFrame)frame, callback); break; } + case WINDOW_UPDATE: + { + onWindowUpdate((WindowUpdateFrame)frame, callback); + break; + } default: { throw new UnsupportedOperationException(); @@ -302,6 +304,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback callback.succeeded(); } + private void onWindowUpdate(WindowUpdateFrame frame, Callback callback) + { + callback.succeeded(); + } + @Override public boolean updateClose(boolean update, boolean local) { diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java index 32b2f8ef5da..6d79f9b9349 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java @@ -99,4 +99,10 @@ public interface IStream extends Stream, Closeable * @return the previous value of the stream receive window */ public int updateRecvWindow(int delta); + + /** + *

Marks this stream as not idle so that the + * {@link #getIdleTimeout() idle timeout} is postponed.

+ */ + public void notIdle(); }