From 8cf15659be0b41d89ac335c91059d8d17b413cdd Mon Sep 17 00:00:00 2001
From: Simone Bordet
Date: Wed, 28 Mar 2018 04:10:11 +0200
Subject: [PATCH] Issue #2349 - Review HTTP/2 max streams enforcement. (#2389)
* Issue #2349 - Review HTTP/2 max streams enforcement.
Changed the max concurrent remote streams enforcement algorithm.
It is now based on the stream count and the closing stream count,
updated atomically in a state machine in HTTP2Stream.
Fixed Javadoc.
Fixed close() method.
Signed-off-by: Simone Bordet
---
.../org/eclipse/jetty/http2/CloseState.java | 51 ++++++++-
.../org/eclipse/jetty/http2/HTTP2Session.java | 51 +++++----
.../org/eclipse/jetty/http2/HTTP2Stream.java | 104 ++++++++++++++++--
.../java/org/eclipse/jetty/http2/IStream.java | 6 +-
.../http2/client/http/EmptyServerHandler.java | 42 +++++++
.../client/http/MaxConcurrentStreamsTest.java | 50 ++++++---
6 files changed, 253 insertions(+), 51 deletions(-)
create mode 100644 jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/EmptyServerHandler.java
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/CloseState.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/CloseState.java
index e51db846ac1..5ee85cbec56 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/CloseState.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/CloseState.java
@@ -18,7 +18,56 @@
package org.eclipse.jetty.http2;
+/**
+ * The set of close states for a stream or a session.
+ *
+ * rcv hc
+ * NOT_CLOSED ---------------> REMOTELY_CLOSED
+ * | |
+ * gen| |gen
+ * hc| |hc
+ * | |
+ * v rcv hc v
+ * LOCALLY_CLOSING --------------> CLOSING
+ * | |
+ * snd| |gen
+ * hc| |hc
+ * | |
+ * v rcv hc v
+ * LOCALLY_CLOSED ----------------> CLOSED
+ *
+ */
public enum CloseState
{
- NOT_CLOSED, LOCALLY_CLOSED, REMOTELY_CLOSED, CLOSED
+ /**
+ * Fully open.
+ */
+ NOT_CLOSED,
+ /**
+ * A half-close frame has been generated.
+ */
+ LOCALLY_CLOSING,
+ /**
+ * A half-close frame has been generated and sent.
+ */
+ LOCALLY_CLOSED,
+ /**
+ * A half-close frame has been received.
+ */
+ REMOTELY_CLOSED,
+ /**
+ * A half-close frame has been received and a half-close frame has been generated, but not yet sent.
+ */
+ CLOSING,
+ /**
+ * Fully closed.
+ */
+ CLOSED;
+
+ public enum Event
+ {
+ RECEIVED,
+ BEFORE_SEND,
+ AFTER_SEND
+ }
}
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
index 2ebcb2dc6c1..83fa280c604 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Session.java
@@ -51,6 +51,7 @@ import org.eclipse.jetty.http2.generator.Generator;
import org.eclipse.jetty.http2.parser.Parser;
import org.eclipse.jetty.io.ByteBufferPool;
import org.eclipse.jetty.io.EndPoint;
+import org.eclipse.jetty.util.AtomicBiInteger;
import org.eclipse.jetty.util.Atomics;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.CountingCallback;
@@ -72,7 +73,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
private final AtomicInteger streamIds = new AtomicInteger();
private final AtomicInteger lastStreamId = new AtomicInteger();
private final AtomicInteger localStreamCount = new AtomicInteger();
- private final AtomicInteger remoteStreamCount = new AtomicInteger();
+ private final AtomicBiInteger remoteStreamCount = new AtomicBiInteger();
private final AtomicInteger sendWindow = new AtomicInteger();
private final AtomicInteger recvWindow = new AtomicInteger();
private final AtomicReference closed = new AtomicReference<>(CloseState.NOT_CLOSED);
@@ -718,14 +719,16 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
// SPEC: exceeding max concurrent streams is treated as stream error.
while (true)
{
- int remoteCount = remoteStreamCount.get();
+ long encoded = remoteStreamCount.get();
+ int remoteCount = AtomicBiInteger.getHi(encoded);
+ int remoteClosing = AtomicBiInteger.getLo(encoded);
int maxCount = getMaxRemoteStreams();
- if (maxCount >= 0 && remoteCount >= maxCount)
+ if (maxCount >= 0 && remoteCount - remoteClosing >= maxCount)
{
reset(new ResetFrame(streamId, ErrorCode.REFUSED_STREAM_ERROR.code), Callback.NOOP);
return null;
}
- if (remoteStreamCount.compareAndSet(remoteCount, remoteCount + 1))
+ if (remoteStreamCount.compareAndSet(encoded, remoteCount + 1, remoteClosing))
break;
}
@@ -748,6 +751,14 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
}
}
+ void updateStreamCount(boolean local, int deltaStreams, int deltaClosing)
+ {
+ if (local)
+ localStreamCount.addAndGet(deltaStreams);
+ else
+ remoteStreamCount.add(deltaStreams, deltaClosing);
+ }
+
protected IStream newStream(int streamId, boolean local)
{
return new HTTP2Stream(scheduler, this, streamId, local);
@@ -759,18 +770,10 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
IStream removed = streams.remove(stream.getId());
if (removed != null)
{
- boolean local = stream.isLocal();
- if (local)
- localStreamCount.decrementAndGet();
- else
- remoteStreamCount.decrementAndGet();
-
onStreamClosed(stream);
-
flowControl.onStreamDestroyed(stream);
-
if (LOG.isDebugEnabled())
- LOG.debug("Removed {} {}", local ? "local" : "remote", stream);
+ LOG.debug("Removed {} {}", stream.isLocal() ? "local" : "remote", stream);
}
}
@@ -1167,7 +1170,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
bytes = frameBytes = generator.control(lease, frame);
if (LOG.isDebugEnabled())
LOG.debug("Generated {}", frame);
- prepare();
+ beforeSend();
return true;
}
@@ -1184,10 +1187,16 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
* sender, the action may have not been performed yet, causing the larger
* data to be rejected, when it should have been accepted.
*/
- private void prepare()
+ private void beforeSend()
{
switch (frame.getType())
{
+ case HEADERS:
+ {
+ HeadersFrame headersFrame = (HeadersFrame)frame;
+ stream.updateClose(headersFrame.isEndStream(), CloseState.Event.BEFORE_SEND);
+ break;
+ }
case SETTINGS:
{
SettingsFrame settingsFrame = (SettingsFrame)frame;
@@ -1213,7 +1222,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
onStreamOpened(stream);
HeadersFrame headersFrame = (HeadersFrame)frame;
- if (stream.updateClose(headersFrame.isEndStream(), true))
+ if (stream.updateClose(headersFrame.isEndStream(), CloseState.Event.AFTER_SEND))
removeStream(stream);
break;
}
@@ -1230,7 +1239,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
// Pushed streams are implicitly remotely closed.
// They are closed when sending an end-stream DATA frame.
- stream.updateClose(true, false);
+ stream.updateClose(true, CloseState.Event.RECEIVED);
break;
}
case GO_AWAY:
@@ -1317,15 +1326,17 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
int length = Math.min(dataBytes, window);
// Only one DATA frame is generated.
- bytes = frameBytes = generator.data(lease, (DataFrame)frame, length);
+ DataFrame dataFrame = (DataFrame)frame;
+ bytes = frameBytes = generator.data(lease, dataFrame, length);
int written = bytes - Frame.HEADER_LENGTH;
if (LOG.isDebugEnabled())
- LOG.debug("Generated {}, length/window/data={}/{}/{}", frame, written, window, dataBytes);
+ LOG.debug("Generated {}, length/window/data={}/{}/{}", dataFrame, written, window, dataBytes);
this.dataWritten = written;
this.dataBytes -= written;
flowControl.onDataSending(stream, written);
+ stream.updateClose(dataFrame.isEndStream(), CloseState.Event.BEFORE_SEND);
return true;
}
@@ -1342,7 +1353,7 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
{
// Only now we can update the close state
// and eventually remove the stream.
- if (stream.updateClose(dataFrame.isEndStream(), true))
+ if (stream.updateClose(dataFrame.isEndStream(), CloseState.Event.AFTER_SEND))
removeStream(stream);
super.succeeded();
}
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
index 91e635d6b96..6320b9efab1 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java
@@ -264,7 +264,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
private void onHeaders(HeadersFrame frame, Callback callback)
{
- if (updateClose(frame.isEndStream(), false))
+ if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
session.removeStream(this);
callback.succeeded();
}
@@ -295,7 +295,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
return;
}
- if (updateClose(frame.isEndStream(), false))
+ if (updateClose(frame.isEndStream(), CloseState.Event.RECEIVED))
session.removeStream(this);
notifyData(this, frame, callback);
}
@@ -312,7 +312,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
{
// Pushed streams are implicitly locally closed.
// They are closed when receiving an end-stream DATA frame.
- updateClose(true, true);
+ updateClose(true, CloseState.Event.AFTER_SEND);
callback.succeeded();
}
@@ -322,14 +322,29 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
}
@Override
- public boolean updateClose(boolean update, boolean local)
+ public boolean updateClose(boolean update, CloseState.Event event)
{
if (LOG.isDebugEnabled())
- LOG.debug("Update close for {} close={} local={}", this, update, local);
+ LOG.debug("Update close for {} update={} event={}", this, update, event);
if (!update)
return false;
+ switch (event)
+ {
+ case RECEIVED:
+ return updateCloseAfterReceived();
+ case BEFORE_SEND:
+ return updateCloseBeforeSend();
+ case AFTER_SEND:
+ return updateCloseAfterSend();
+ default:
+ return false;
+ }
+ }
+
+ private boolean updateCloseAfterReceived()
+ {
while (true)
{
CloseState current = closeState.get();
@@ -337,22 +352,79 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
{
case NOT_CLOSED:
{
- CloseState newValue = local ? CloseState.LOCALLY_CLOSED : CloseState.REMOTELY_CLOSED;
- if (closeState.compareAndSet(current, newValue))
+ if (closeState.compareAndSet(current, CloseState.REMOTELY_CLOSED))
return false;
break;
}
+ case LOCALLY_CLOSING:
+ {
+ if (closeState.compareAndSet(current, CloseState.CLOSING))
+ {
+ updateStreamCount(0, 1);
+ return false;
+ }
+ break;
+ }
case LOCALLY_CLOSED:
{
- if (local)
- return false;
close();
return true;
}
+ default:
+ {
+ return false;
+ }
+ }
+ }
+ }
+
+ private boolean updateCloseBeforeSend()
+ {
+ while (true)
+ {
+ CloseState current = closeState.get();
+ switch (current)
+ {
+ case NOT_CLOSED:
+ {
+ if (closeState.compareAndSet(current, CloseState.LOCALLY_CLOSING))
+ return false;
+ break;
+ }
case REMOTELY_CLOSED:
{
- if (!local)
+ if (closeState.compareAndSet(current, CloseState.CLOSING))
+ {
+ updateStreamCount(0, 1);
return false;
+ }
+ break;
+ }
+ default:
+ {
+ return false;
+ }
+ }
+ }
+ }
+
+ private boolean updateCloseAfterSend()
+ {
+ while (true)
+ {
+ CloseState current = closeState.get();
+ switch (current)
+ {
+ case NOT_CLOSED:
+ case LOCALLY_CLOSING:
+ {
+ if (closeState.compareAndSet(current, CloseState.LOCALLY_CLOSED))
+ return false;
+ break;
+ }
+ case REMOTELY_CLOSED:
+ case CLOSING:
+ {
close();
return true;
}
@@ -389,8 +461,18 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
@Override
public void close()
{
- if (closeState.getAndSet(CloseState.CLOSED) != CloseState.CLOSED)
+ CloseState oldState = closeState.getAndSet(CloseState.CLOSED);
+ if (oldState != CloseState.CLOSED)
+ {
+ int deltaClosing = oldState == CloseState.CLOSING ? -1 : 0;
+ updateStreamCount(-1, deltaClosing);
onClose();
+ }
+ }
+
+ private void updateStreamCount(int deltaStream, int deltaClosing)
+ {
+ ((HTTP2Session)session).updateStreamCount(isLocal(), deltaStream, deltaClosing);
}
@Override
diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java
index 3fe92bbaf23..95dc4f4fb87 100644
--- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java
+++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/IStream.java
@@ -76,12 +76,10 @@ public interface IStream extends Stream, Closeable
* Updates the close state of this stream.
*
* @param update whether to update the close state
- * @param local whether the update comes from a local operation
- * (such as sending a frame that ends the stream)
- * or a remote operation (such as receiving a frame
+ * @param event the event that caused the close state update
* @return whether the stream has been fully closed by this invocation
*/
- public boolean updateClose(boolean update, boolean local);
+ public boolean updateClose(boolean update, CloseState.Event event);
/**
* Forcibly closes this stream.
diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/EmptyServerHandler.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/EmptyServerHandler.java
new file mode 100644
index 00000000000..cd146e572d6
--- /dev/null
+++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/EmptyServerHandler.java
@@ -0,0 +1,42 @@
+//
+// ========================================================================
+// Copyright (c) 1995-2018 Mort Bay Consulting Pty. Ltd.
+// ------------------------------------------------------------------------
+// All rights reserved. This program and the accompanying materials
+// are made available under the terms of the Eclipse Public License v1.0
+// and Apache License v2.0 which accompanies this distribution.
+//
+// The Eclipse Public License is available at
+// http://www.eclipse.org/legal/epl-v10.html
+//
+// The Apache License v2.0 is available at
+// http://www.opensource.org/licenses/apache2.0.php
+//
+// You may elect to redistribute this code under either of these licenses.
+// ========================================================================
+//
+
+package org.eclipse.jetty.http2.client.http;
+
+import java.io.IOException;
+
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.eclipse.jetty.server.Request;
+import org.eclipse.jetty.server.handler.AbstractHandler;
+
+public class EmptyServerHandler extends AbstractHandler.ErrorDispatchHandler
+{
+ @Override
+ protected final void doNonErrorHandle(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+ jettyRequest.setHandled(true);
+ service(target, jettyRequest, request, response);
+ }
+
+ protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ {
+ }
+}
diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java
index a327c99207e..f3ce3a830ba 100644
--- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java
+++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MaxConcurrentStreamsTest.java
@@ -18,11 +18,10 @@
package org.eclipse.jetty.http2.client.http;
-import java.io.IOException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
+import java.util.stream.IntStream;
-import javax.servlet.ServletException;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
@@ -32,7 +31,6 @@ import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
import org.eclipse.jetty.server.Handler;
import org.eclipse.jetty.server.HttpConfiguration;
import org.eclipse.jetty.server.Request;
-import org.eclipse.jetty.server.handler.AbstractHandler;
import org.junit.Assert;
import org.junit.Test;
@@ -53,12 +51,11 @@ public class MaxConcurrentStreamsTest extends AbstractTest
public void testOneConcurrentStream() throws Exception
{
long sleep = 1000;
- start(1, new AbstractHandler()
+ start(1, new EmptyServerHandler()
{
@Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
- baseRequest.setHandled(true);
// Sleep a bit to allow the second request to be queued.
sleep(sleep);
}
@@ -91,17 +88,42 @@ public class MaxConcurrentStreamsTest extends AbstractTest
Assert.assertTrue(latch.await(5 * sleep, TimeUnit.MILLISECONDS));
}
+ @Test
+ public void testManyIterationsWithConcurrentStreams() throws Exception
+ {
+ int concurrency = 1;
+ start(concurrency, new EmptyServerHandler());
+
+ int iterations = 50;
+ IntStream.range(0, concurrency).parallel().forEach(i ->
+ IntStream.range(0, iterations).forEach(j ->
+ {
+ try
+ {
+ ContentResponse response = client.newRequest("localhost", connector.getLocalPort())
+ .path("/" + i + "_" + j)
+ .timeout(5, TimeUnit.SECONDS)
+ .send();
+ Assert.assertEquals(HttpStatus.OK_200, response.getStatus());
+ }
+ catch (Throwable x)
+ {
+ throw new RuntimeException(x);
+ }
+ })
+ );
+ }
+
@Test
public void testTwoConcurrentStreamsThirdWaits() throws Exception
{
int maxStreams = 2;
long sleep = 1000;
- start(maxStreams, new AbstractHandler()
+ start(maxStreams, new EmptyServerHandler()
{
@Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
- baseRequest.setHandled(true);
sleep(sleep);
}
});
@@ -140,12 +162,11 @@ public class MaxConcurrentStreamsTest extends AbstractTest
public void testAbortedWhileQueued() throws Exception
{
long sleep = 1000;
- start(1, new AbstractHandler()
+ start(1, new EmptyServerHandler()
{
@Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
- baseRequest.setHandled(true);
sleep(sleep);
}
});
@@ -170,12 +191,11 @@ public class MaxConcurrentStreamsTest extends AbstractTest
{
int maxConcurrent = 10;
long sleep = 500;
- start(maxConcurrent, new AbstractHandler()
+ start(maxConcurrent, new EmptyServerHandler()
{
@Override
- public void handle(String target, Request baseRequest, HttpServletRequest request, HttpServletResponse response) throws IOException, ServletException
+ protected void service(String target, Request jettyRequest, HttpServletRequest request, HttpServletResponse response)
{
- baseRequest.setHandled(true);
sleep(sleep);
}
});