diff --git a/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStalledTest.java b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStalledTest.java new file mode 100644 index 00000000000..8af193c8c7f --- /dev/null +++ b/jetty-http2/http2-client/src/test/java/org/eclipse/jetty/http2/client/FlowControlStalledTest.java @@ -0,0 +1,318 @@ +// +// ======================================================================== +// Copyright (c) 1995-2015 Mort Bay Consulting Pty. Ltd. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.http2.client; + +import java.net.InetSocketAddress; +import java.nio.ByteBuffer; +import java.util.ArrayDeque; +import java.util.HashMap; +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Supplier; + +import org.eclipse.jetty.http.HostPortHttpField; +import org.eclipse.jetty.http.HttpFields; +import org.eclipse.jetty.http.HttpScheme; +import org.eclipse.jetty.http.HttpStatus; +import org.eclipse.jetty.http.HttpVersion; +import org.eclipse.jetty.http.MetaData; +import org.eclipse.jetty.http2.BufferingFlowControlStrategy; +import org.eclipse.jetty.http2.FlowControlStrategy; +import org.eclipse.jetty.http2.ISession; +import org.eclipse.jetty.http2.IStream; +import org.eclipse.jetty.http2.api.Session; +import org.eclipse.jetty.http2.api.Stream; +import org.eclipse.jetty.http2.api.server.ServerSessionListener; +import org.eclipse.jetty.http2.frames.DataFrame; +import org.eclipse.jetty.http2.frames.HeadersFrame; +import org.eclipse.jetty.http2.frames.SettingsFrame; +import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.toolchain.test.TestTracker; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.FuturePromise; +import org.eclipse.jetty.util.Promise; +import org.eclipse.jetty.util.thread.QueuedThreadPool; +import org.junit.After; +import org.junit.Assert; +import org.junit.Rule; +import org.junit.Test; + +public class FlowControlStalledTest +{ + @Rule + public TestTracker tracker = new TestTracker(); + protected ServerConnector connector; + protected HTTP2Client client; + protected Server server; + + protected void start(Supplier flowControlFactory, ServerSessionListener listener) throws Exception + { + QueuedThreadPool serverExecutor = new QueuedThreadPool(); + serverExecutor.setName("server"); + server = new Server(serverExecutor); + connector = new ServerConnector(server, new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), listener) + { + @Override + protected FlowControlStrategy newFlowControlStrategy() + { + return flowControlFactory.get(); + } + }); + server.addConnector(connector); + server.start(); + + client = new HTTP2Client(); + QueuedThreadPool clientExecutor = new QueuedThreadPool(); + clientExecutor.setName("client"); + client.setExecutor(clientExecutor); + client.setClientConnectionFactory(new HTTP2ClientConnectionFactory() + { + @Override + protected FlowControlStrategy newFlowControlStrategy() + { + return flowControlFactory.get(); + } + }); + client.start(); + } + + protected Session newClient(Session.Listener listener) throws Exception + { + String host = "localhost"; + int port = connector.getLocalPort(); + InetSocketAddress address = new InetSocketAddress(host, port); + FuturePromise promise = new FuturePromise<>(); + client.connect(address, listener, promise); + return promise.get(5, TimeUnit.SECONDS); + } + + protected MetaData.Request newRequest(String method, String target, HttpFields fields) + { + String host = "localhost"; + int port = connector.getLocalPort(); + String authority = host + ":" + port; + return new MetaData.Request(method, HttpScheme.HTTP, new HostPortHttpField(authority), target, HttpVersion.HTTP_2, fields); + } + + @After + public void dispose() throws Exception + { + // Allow WINDOW_UPDATE frames to be sent/received to avoid exception stack traces. + Thread.sleep(1000); + client.stop(); + server.stop(); + } + + @Test + public void testStreamStalledIsInvokedOnlyOnce() throws Exception + { + AtomicReference stallLatch = new AtomicReference<>(new CountDownLatch(1)); + CountDownLatch unstallLatch = new CountDownLatch(1); + start(() -> new BufferingFlowControlStrategy(0.5f) + { + @Override + public void onStreamStalled(IStream stream) + { + super.onStreamStalled(stream); + stallLatch.get().countDown(); + } + + @Override + protected void onStreamUnstalled(IStream stream) + { + super.onStreamUnstalled(stream); + unstallLatch.countDown(); + } + }, new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Request request = (MetaData.Request)frame.getMetaData(); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + + if (request.getURIString().endsWith("/stall")) + { + stream.headers(new HeadersFrame(stream.getId(), response, null, false), new Callback() + { + @Override + public void succeeded() + { + // Send a large chunk of data so the stream gets stalled. + ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE + 1); + stream.data(new DataFrame(stream.getId(), data, true), NOOP); + } + }); + } + else + { + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + } + + return null; + } + }); + + // Use a large session window so that only the stream gets stalled. + client.setInitialSessionRecvWindow(5 * FlowControlStrategy.DEFAULT_WINDOW_SIZE); + Session client = newClient(new Session.Listener.Adapter()); + + CountDownLatch latch = new CountDownLatch(1); + Queue callbacks = new ArrayDeque<>(); + MetaData.Request request = newRequest("GET", "/stall", new HttpFields()); + client.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + callbacks.offer(callback); + if (frame.isEndStream()) + latch.countDown(); + } + }); + + Assert.assertTrue(stallLatch.get().await(5, TimeUnit.SECONDS)); + + // First stream is now stalled, check that writing a second stream + // does not result in the first be notified again of being stalled. + stallLatch.set(new CountDownLatch(1)); + + request = newRequest("GET", "/", new HttpFields()); + client.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()); + + Assert.assertFalse(stallLatch.get().await(1, TimeUnit.SECONDS)); + + // Consume all data. + while (!latch.await(10, TimeUnit.MILLISECONDS)) + { + Callback callback = callbacks.poll(); + if (callback != null) + callback.succeeded(); + } + + // Make sure the unstall callback is invoked. + Assert.assertTrue(unstallLatch.await(5, TimeUnit.SECONDS)); + } + + @Test + public void testSessionStalledIsInvokedOnlyOnce() throws Exception + { + AtomicReference stallLatch = new AtomicReference<>(new CountDownLatch(1)); + CountDownLatch unstallLatch = new CountDownLatch(1); + start(() -> new BufferingFlowControlStrategy(0.5f) + { + @Override + public void onSessionStalled(ISession session) + { + super.onSessionStalled(session); + stallLatch.get().countDown(); + } + + @Override + protected void onSessionUnstalled(ISession session) + { + super.onSessionUnstalled(session); + unstallLatch.countDown(); + } + }, new ServerSessionListener.Adapter() + { + @Override + public Stream.Listener onNewStream(Stream stream, HeadersFrame frame) + { + MetaData.Request request = (MetaData.Request)frame.getMetaData(); + MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields()); + + if (request.getURIString().endsWith("/stall")) + { + stream.headers(new HeadersFrame(stream.getId(), response, null, false), new Callback() + { + @Override + public void succeeded() + { + // Send a large chunk of data so the session gets stalled. + ByteBuffer data = ByteBuffer.allocate(FlowControlStrategy.DEFAULT_WINDOW_SIZE + 1); + stream.data(new DataFrame(stream.getId(), data, true), NOOP); + } + }); + } + else + { + stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP); + } + + return null; + } + }); + + // Use a large stream window so that only the session gets stalled. + client.setInitialStreamRecvWindow(5 * FlowControlStrategy.DEFAULT_WINDOW_SIZE); + Session session = newClient(new Session.Listener.Adapter() + { + @Override + public Map onPreface(Session session) + { + Map settings = new HashMap<>(); + settings.put(SettingsFrame.INITIAL_WINDOW_SIZE, client.getInitialStreamRecvWindow()); + return settings; + } + }); + + CountDownLatch latch = new CountDownLatch(1); + Queue callbacks = new ArrayDeque<>(); + MetaData.Request request = newRequest("GET", "/stall", new HttpFields()); + session.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter() + { + @Override + public void onData(Stream stream, DataFrame frame, Callback callback) + { + callbacks.offer(callback); + if (frame.isEndStream()) + latch.countDown(); + } + }); + + Assert.assertTrue(stallLatch.get().await(5, TimeUnit.SECONDS)); + + // The session is now stalled, check that writing a second stream + // does not result in the session be notified again of being stalled. + stallLatch.set(new CountDownLatch(1)); + + request = newRequest("GET", "/", new HttpFields()); + session.newStream(new HeadersFrame(request, null, true), new Promise.Adapter<>(), new Stream.Listener.Adapter()); + + Assert.assertFalse(stallLatch.get().await(1, TimeUnit.SECONDS)); + + // Consume all data. + while (!latch.await(10, TimeUnit.MILLISECONDS)) + { + Callback callback = callbacks.poll(); + if (callback != null) + callback.succeeded(); + } + + // Make sure the unstall callback is invoked. + Assert.assertTrue(unstallLatch.await(5, TimeUnit.SECONDS)); + } +} diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/AbstractFlowControlStrategy.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/AbstractFlowControlStrategy.java index e6ebc250eb1..b2908107a70 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/AbstractFlowControlStrategy.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/AbstractFlowControlStrategy.java @@ -18,10 +18,13 @@ package org.eclipse.jetty.http2; +import java.util.concurrent.atomic.AtomicLong; + import org.eclipse.jetty.http2.api.Stream; import org.eclipse.jetty.http2.frames.WindowUpdateFrame; import org.eclipse.jetty.util.annotation.ManagedAttribute; import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.annotation.ManagedOperation; import org.eclipse.jetty.util.log.Log; import org.eclipse.jetty.util.log.Logger; @@ -30,6 +33,7 @@ public abstract class AbstractFlowControlStrategy implements FlowControlStrategy { protected static final Logger LOG = Log.getLogger(FlowControlStrategy.class); + private final AtomicLong sessionStalls = new AtomicLong(); private int initialStreamSendWindow; private int initialStreamRecvWindow; @@ -148,13 +152,19 @@ public abstract class AbstractFlowControlStrategy implements FlowControlStrategy return; ISession session = stream.getSession(); - int oldSize = session.updateSendWindow(-length); + int oldSessionWindow = session.updateSendWindow(-length); + int newSessionWindow = oldSessionWindow - length; if (LOG.isDebugEnabled()) - LOG.debug("Updated session send window {} -> {} for {}", oldSize, oldSize - length, session); + LOG.debug("Updated session send window {} -> {} for {}", oldSessionWindow, newSessionWindow, session); + if (newSessionWindow <= 0) + onSessionStalled(session); - oldSize = stream.updateSendWindow(-length); + int oldStreamWindow = stream.updateSendWindow(-length); + int newStreamWindow = oldStreamWindow - length; if (LOG.isDebugEnabled()) - LOG.debug("Updated stream send window {} -> {} for {}", oldSize, oldSize - length, stream); + LOG.debug("Updated stream send window {} -> {} for {}", oldStreamWindow, newStreamWindow, stream); + if (newStreamWindow <= 0) + onStreamStalled(stream); } @Override @@ -162,15 +172,14 @@ public abstract class AbstractFlowControlStrategy implements FlowControlStrategy { } - @Override - public void onSessionStalled(ISession session) + protected void onSessionStalled(ISession session) { if (LOG.isDebugEnabled()) LOG.debug("Session stalled {}", session); + sessionStalls.incrementAndGet(); } - @Override - public void onStreamStalled(IStream stream) + protected void onStreamStalled(IStream stream) { if (LOG.isDebugEnabled()) LOG.debug("Stream stalled {}", stream); @@ -187,4 +196,16 @@ public abstract class AbstractFlowControlStrategy implements FlowControlStrategy if (LOG.isDebugEnabled()) LOG.debug("Stream unstalled {}", stream); } + + @ManagedAttribute(value = "The number of times the session flow control has stalled", readonly = true) + public long getSessionStallCount() + { + return sessionStalls.get(); + } + + @ManagedOperation(value = "Resets the statistics", impact = "ACTION") + public void reset() + { + sessionStalls.set(0); + } } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/FlowControlStrategy.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/FlowControlStrategy.java index 2907abdc00b..d4e86b16f28 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/FlowControlStrategy.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/FlowControlStrategy.java @@ -41,8 +41,4 @@ public interface FlowControlStrategy public void onDataSending(IStream stream, int length); public void onDataSent(IStream stream, int length); - - public void onSessionStalled(ISession session); - - public void onStreamStalled(IStream stream); } diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java index 9dbb4068508..10cb349562d 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Flusher.java @@ -173,10 +173,8 @@ public class HTTP2Flusher extends IteratingCallback int remaining = entry.dataRemaining(); if (remaining > 0) { - FlowControlStrategy flowControl = session.getFlowControlStrategy(); if (sessionWindow <= 0) { - flowControl.onSessionStalled(session); ++index; // There may be *non* flow controlled frames to send. continue; @@ -195,7 +193,6 @@ public class HTTP2Flusher extends IteratingCallback // Is it a frame belonging to an already stalled stream ? if (streamWindow <= 0) { - flowControl.onStreamStalled(stream); ++index; // There may be *non* flow controlled frames to send. continue;