* Fixes #6208 - HTTP/2 max local stream count exceeded Backported from Jetty 10 the "new stream" event so that the Stream can be set early on the client's `HttpChannelOverHTTP2`. In this way, when a HEADERS frame stalled due to TCP congestion is failed, the corresponding Stream is closed and the connection released to the pool, fixing the "max stream exceeded" issue. Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
parent
e3abe7b498
commit
2f19c67b41
|
@ -31,6 +31,7 @@ import java.util.Queue;
|
|||
import java.util.Set;
|
||||
|
||||
import org.eclipse.jetty.http2.frames.Frame;
|
||||
import org.eclipse.jetty.http2.frames.FrameType;
|
||||
import org.eclipse.jetty.http2.frames.WindowUpdateFrame;
|
||||
import org.eclipse.jetty.http2.hpack.HpackException;
|
||||
import org.eclipse.jetty.io.ByteBufferPool;
|
||||
|
@ -195,11 +196,11 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
|
||||
// If the stream has been reset or removed,
|
||||
// don't send the frame and fail it here.
|
||||
if (entry.isStale())
|
||||
if (entry.shouldBeDropped())
|
||||
{
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Stale {}", entry);
|
||||
entry.failed(new EofException("reset"));
|
||||
LOG.debug("Dropped {}", entry);
|
||||
entry.failed(new EofException("dropped"));
|
||||
pending.remove();
|
||||
continue;
|
||||
}
|
||||
|
@ -450,40 +451,47 @@ public class HTTP2Flusher extends IteratingCallback implements Dumpable
|
|||
}
|
||||
|
||||
/**
|
||||
* @return whether the entry is stale and must not be processed
|
||||
* @return whether the entry should not be processed
|
||||
*/
|
||||
private boolean isStale()
|
||||
{
|
||||
// If it is a protocol frame, process it.
|
||||
if (isProtocolFrame(frame))
|
||||
return false;
|
||||
// It's an application frame; is the stream gone already?
|
||||
if (stream == null)
|
||||
return true;
|
||||
return stream.isResetOrFailed();
|
||||
}
|
||||
|
||||
private boolean isProtocolFrame(Frame frame)
|
||||
private boolean shouldBeDropped()
|
||||
{
|
||||
switch (frame.getType())
|
||||
{
|
||||
case DATA:
|
||||
case HEADERS:
|
||||
case PUSH_PROMISE:
|
||||
case CONTINUATION:
|
||||
return false;
|
||||
// Frames of this type should not be dropped.
|
||||
case PRIORITY:
|
||||
case RST_STREAM:
|
||||
case SETTINGS:
|
||||
case PING:
|
||||
case GO_AWAY:
|
||||
case WINDOW_UPDATE:
|
||||
case PREFACE:
|
||||
case DISCONNECT:
|
||||
return true;
|
||||
return false;
|
||||
// Frames of this type follow the logic below.
|
||||
case DATA:
|
||||
case HEADERS:
|
||||
case PUSH_PROMISE:
|
||||
case CONTINUATION:
|
||||
case RST_STREAM:
|
||||
break;
|
||||
default:
|
||||
throw new IllegalStateException();
|
||||
}
|
||||
|
||||
// SPEC: section 6.4.
|
||||
if (frame.getType() == FrameType.RST_STREAM)
|
||||
return stream != null && stream.isLocal() && !stream.isCommitted();
|
||||
|
||||
// Frames that do not have a stream associated are dropped.
|
||||
if (stream == null)
|
||||
return true;
|
||||
|
||||
return stream.isResetOrFailed();
|
||||
}
|
||||
|
||||
void commit()
|
||||
{
|
||||
if (stream != null)
|
||||
stream.commit();
|
||||
}
|
||||
|
||||
@Override
|
||||
|
|
|
@ -47,6 +47,7 @@ import org.eclipse.jetty.http2.frames.FrameType;
|
|||
import org.eclipse.jetty.http2.frames.GoAwayFrame;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
import org.eclipse.jetty.http2.frames.PingFrame;
|
||||
import org.eclipse.jetty.http2.frames.PrefaceFrame;
|
||||
import org.eclipse.jetty.http2.frames.PriorityFrame;
|
||||
import org.eclipse.jetty.http2.frames.PushPromiseFrame;
|
||||
import org.eclipse.jetty.http2.frames.ResetFrame;
|
||||
|
@ -1221,6 +1222,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
@Override
|
||||
public void succeeded()
|
||||
{
|
||||
commit();
|
||||
|
||||
bytesWritten.addAndGet(frameBytes);
|
||||
frameBytes = 0;
|
||||
|
||||
|
@ -2072,6 +2075,8 @@ public abstract class HTTP2Session extends ContainerLifeCycle implements ISessio
|
|||
return false;
|
||||
|
||||
stream.setListener(listener);
|
||||
stream.process(new PrefaceFrame(), Callback.NOOP);
|
||||
|
||||
Callback streamCallback = Callback.from(() -> promise.succeeded(stream), x ->
|
||||
{
|
||||
HTTP2Session.this.onStreamDestroyed(streamId);
|
||||
|
|
|
@ -67,6 +67,7 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
|||
private boolean remoteReset;
|
||||
private Listener listener;
|
||||
private long dataLength;
|
||||
private boolean committed;
|
||||
|
||||
public HTTP2Stream(Scheduler scheduler, ISession session, int streamId, boolean local)
|
||||
{
|
||||
|
@ -226,6 +227,18 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
|||
return closeState.get() == CloseState.LOCALLY_CLOSED;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void commit()
|
||||
{
|
||||
committed = true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isCommitted()
|
||||
{
|
||||
return committed;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isOpen()
|
||||
{
|
||||
|
@ -278,6 +291,11 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
|||
notIdle();
|
||||
switch (frame.getType())
|
||||
{
|
||||
case PREFACE:
|
||||
{
|
||||
onNewStream(callback);
|
||||
break;
|
||||
}
|
||||
case HEADERS:
|
||||
{
|
||||
onHeaders((HeadersFrame)frame, callback);
|
||||
|
@ -315,6 +333,12 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
|||
}
|
||||
}
|
||||
|
||||
private void onNewStream(Callback callback)
|
||||
{
|
||||
notifyNewStream(this);
|
||||
callback.succeeded();
|
||||
}
|
||||
|
||||
private void onHeaders(HeadersFrame frame, Callback callback)
|
||||
{
|
||||
MetaData metaData = frame.getMetaData();
|
||||
|
@ -586,6 +610,22 @@ public class HTTP2Stream extends IdleTimeout implements IStream, Callback, Dumpa
|
|||
}
|
||||
}
|
||||
|
||||
private void notifyNewStream(Stream stream)
|
||||
{
|
||||
Listener listener = this.listener;
|
||||
if (listener != null)
|
||||
{
|
||||
try
|
||||
{
|
||||
listener.onNewStream(stream);
|
||||
}
|
||||
catch (Throwable x)
|
||||
{
|
||||
LOG.info("Failure while notifying listener {}", listener, x);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void notifyData(Stream stream, DataFrame frame, Callback callback)
|
||||
{
|
||||
Listener listener = this.listener;
|
||||
|
|
|
@ -126,6 +126,19 @@ public interface IStream extends Stream, Attachable, Closeable
|
|||
*/
|
||||
boolean isResetOrFailed();
|
||||
|
||||
/**
|
||||
* Marks this stream as committed.
|
||||
*
|
||||
* @see #isCommitted()
|
||||
*/
|
||||
void commit();
|
||||
|
||||
/**
|
||||
* @return whether bytes for this stream have been sent to the remote peer.
|
||||
* @see #commit()
|
||||
*/
|
||||
boolean isCommitted();
|
||||
|
||||
/**
|
||||
* <p>An ordered list of frames belonging to the same stream.</p>
|
||||
*/
|
||||
|
|
|
@ -138,6 +138,16 @@ public interface Stream
|
|||
*/
|
||||
interface Listener
|
||||
{
|
||||
/**
|
||||
* <p>Callback method invoked when a stream is created locally by
|
||||
* {@link Session#newStream(HeadersFrame, Promise, Listener)}.</p>
|
||||
*
|
||||
* @param stream the newly created stream
|
||||
*/
|
||||
public default void onNewStream(Stream stream)
|
||||
{
|
||||
}
|
||||
|
||||
/**
|
||||
* <p>Callback method invoked when a HEADERS frame representing the HTTP response has been received.</p>
|
||||
*
|
||||
|
|
|
@ -25,6 +25,7 @@ import org.eclipse.jetty.client.HttpReceiver;
|
|||
import org.eclipse.jetty.client.HttpSender;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.http2.ErrorCode;
|
||||
import org.eclipse.jetty.http2.IStream;
|
||||
import org.eclipse.jetty.http2.api.Session;
|
||||
import org.eclipse.jetty.http2.api.Stream;
|
||||
import org.eclipse.jetty.http2.frames.ResetFrame;
|
||||
|
@ -82,6 +83,8 @@ public class HttpChannelOverHTTP2 extends HttpChannel
|
|||
public void setStream(Stream stream)
|
||||
{
|
||||
this.stream = stream;
|
||||
if (stream != null)
|
||||
((IStream)stream).setAttachment(this);
|
||||
}
|
||||
|
||||
public boolean isFailed()
|
||||
|
|
|
@ -74,6 +74,12 @@ public class HttpReceiverOverHTTP2 extends HttpReceiver implements Stream.Listen
|
|||
contentNotifier.reset();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onNewStream(Stream stream)
|
||||
{
|
||||
getHttpChannel().setStream(stream);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onHeaders(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
|
|
|
@ -177,7 +177,7 @@ public class HttpSenderOverHTTP2 extends HttpSender
|
|||
stream.headers(trailersFrame, callback);
|
||||
}
|
||||
|
||||
private class HeadersPromise implements Promise<Stream>
|
||||
private static class HeadersPromise implements Promise<Stream>
|
||||
{
|
||||
private final HttpRequest request;
|
||||
private final Callback callback;
|
||||
|
@ -191,9 +191,6 @@ public class HttpSenderOverHTTP2 extends HttpSender
|
|||
@Override
|
||||
public void succeeded(Stream stream)
|
||||
{
|
||||
HttpChannelOverHTTP2 channel = getHttpChannel();
|
||||
channel.setStream(stream);
|
||||
((IStream)stream).setAttachment(channel);
|
||||
long idleTimeout = request.getIdleTimeout();
|
||||
if (idleTimeout >= 0)
|
||||
stream.setIdleTimeout(idleTimeout);
|
||||
|
|
|
@ -28,7 +28,9 @@ import java.util.concurrent.ConcurrentLinkedQueue;
|
|||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ForkJoinPool;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
import java.util.stream.IntStream;
|
||||
import javax.servlet.http.HttpServletRequest;
|
||||
import javax.servlet.http.HttpServletResponse;
|
||||
|
@ -41,9 +43,14 @@ import org.eclipse.jetty.client.MultiplexConnectionPool;
|
|||
import org.eclipse.jetty.client.api.ContentResponse;
|
||||
import org.eclipse.jetty.client.api.Response;
|
||||
import org.eclipse.jetty.client.api.Result;
|
||||
import org.eclipse.jetty.client.util.BytesContentProvider;
|
||||
import org.eclipse.jetty.http.HttpFields;
|
||||
import org.eclipse.jetty.http.HttpStatus;
|
||||
import org.eclipse.jetty.http.HttpVersion;
|
||||
import org.eclipse.jetty.http.MetaData;
|
||||
import org.eclipse.jetty.http2.api.Session;
|
||||
import org.eclipse.jetty.http2.api.Stream;
|
||||
import org.eclipse.jetty.http2.api.server.ServerSessionListener;
|
||||
import org.eclipse.jetty.http2.client.HTTP2Client;
|
||||
import org.eclipse.jetty.http2.frames.GoAwayFrame;
|
||||
import org.eclipse.jetty.http2.frames.HeadersFrame;
|
||||
|
@ -51,15 +58,23 @@ import org.eclipse.jetty.http2.frames.PingFrame;
|
|||
import org.eclipse.jetty.http2.frames.ResetFrame;
|
||||
import org.eclipse.jetty.http2.frames.SettingsFrame;
|
||||
import org.eclipse.jetty.http2.server.HTTP2ServerConnectionFactory;
|
||||
import org.eclipse.jetty.http2.server.RawHTTP2ServerConnectionFactory;
|
||||
import org.eclipse.jetty.io.AbstractEndPoint;
|
||||
import org.eclipse.jetty.io.Connection;
|
||||
import org.eclipse.jetty.io.EndPoint;
|
||||
import org.eclipse.jetty.server.Handler;
|
||||
import org.eclipse.jetty.server.HttpConfiguration;
|
||||
import org.eclipse.jetty.server.Request;
|
||||
import org.eclipse.jetty.util.Callback;
|
||||
import org.eclipse.jetty.util.Promise;
|
||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||
import org.hamcrest.Matchers;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
import static org.hamcrest.MatcherAssert.assertThat;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
public class MaxConcurrentStreamsTest extends AbstractTest
|
||||
|
@ -418,6 +433,118 @@ public class MaxConcurrentStreamsTest extends AbstractTest
|
|||
assertTrue(latch.await(2 * timeout, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testTCPCongestedStreamTimesOut() throws Exception
|
||||
{
|
||||
CountDownLatch request1Latch = new CountDownLatch(1);
|
||||
RawHTTP2ServerConnectionFactory http2 = new RawHTTP2ServerConnectionFactory(new HttpConfiguration(), new ServerSessionListener.Adapter()
|
||||
{
|
||||
@Override
|
||||
public Stream.Listener onNewStream(Stream stream, HeadersFrame frame)
|
||||
{
|
||||
MetaData.Request request = (MetaData.Request)frame.getMetaData();
|
||||
switch (request.getURI().getPath())
|
||||
{
|
||||
case "/1":
|
||||
{
|
||||
// Do not return to cause TCP congestion.
|
||||
assertTrue(awaitLatch(request1Latch, 15, TimeUnit.SECONDS));
|
||||
MetaData.Response response1 = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||
stream.headers(new HeadersFrame(stream.getId(), response1, null, true), Callback.NOOP);
|
||||
break;
|
||||
}
|
||||
case "/3":
|
||||
{
|
||||
MetaData.Response response3 = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.OK_200, new HttpFields());
|
||||
stream.headers(new HeadersFrame(stream.getId(), response3, null, true), Callback.NOOP);
|
||||
break;
|
||||
}
|
||||
default:
|
||||
{
|
||||
MetaData.Response response = new MetaData.Response(HttpVersion.HTTP_2, HttpStatus.INTERNAL_SERVER_ERROR_500, new HttpFields());
|
||||
stream.headers(new HeadersFrame(stream.getId(), response, null, true), Callback.NOOP);
|
||||
break;
|
||||
}
|
||||
}
|
||||
// Return a Stream listener that consumes the content.
|
||||
return new Stream.Listener.Adapter();
|
||||
}
|
||||
});
|
||||
http2.setMaxConcurrentStreams(2);
|
||||
// Set the HTTP/2 flow control windows very large so we can
|
||||
// cause TCP congestion, not HTTP/2 flow control congestion.
|
||||
http2.setInitialSessionRecvWindow(512 * 1024 * 1024);
|
||||
http2.setInitialStreamRecvWindow(512 * 1024 * 1024);
|
||||
prepareServer(http2);
|
||||
server.start();
|
||||
|
||||
prepareClient();
|
||||
AtomicReference<AbstractEndPoint> clientEndPointRef = new AtomicReference<>();
|
||||
CountDownLatch clientEndPointLatch = new CountDownLatch(1);
|
||||
client = new HttpClient(new HttpClientTransportOverHTTP2(http2Client)
|
||||
{
|
||||
@Override
|
||||
public Connection newConnection(EndPoint endPoint, Map<String, Object> context) throws IOException
|
||||
{
|
||||
clientEndPointRef.set((AbstractEndPoint)endPoint);
|
||||
clientEndPointLatch.countDown();
|
||||
return super.newConnection(endPoint, context);
|
||||
}
|
||||
});
|
||||
client.setMaxConnectionsPerDestination(1);
|
||||
client.start();
|
||||
|
||||
// First request must cause TCP congestion.
|
||||
CountDownLatch response1Latch = new CountDownLatch(1);
|
||||
client.newRequest("localhost", connector.getLocalPort()).path("/1")
|
||||
.content(new BytesContentProvider(new byte[64 * 1024 * 1024]))
|
||||
.send(result ->
|
||||
{
|
||||
assertTrue(result.isSucceeded(), String.valueOf(result.getFailure()));
|
||||
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
|
||||
response1Latch.countDown();
|
||||
});
|
||||
|
||||
// Wait until TCP congested.
|
||||
assertTrue(clientEndPointLatch.await(5, TimeUnit.SECONDS));
|
||||
AbstractEndPoint clientEndPoint = clientEndPointRef.get();
|
||||
long start = System.nanoTime();
|
||||
while (!clientEndPoint.getWriteFlusher().isPending())
|
||||
{
|
||||
long elapsed = System.nanoTime() - start;
|
||||
assertThat(TimeUnit.NANOSECONDS.toSeconds(elapsed), Matchers.lessThan(15L));
|
||||
Thread.sleep(100);
|
||||
}
|
||||
// Wait for the selector to update the SelectionKey to OP_WRITE.
|
||||
Thread.sleep(1000);
|
||||
|
||||
// Second request cannot be sent due to TCP congestion and times out.
|
||||
assertThrows(TimeoutException.class, () -> client.newRequest("localhost", connector.getLocalPort())
|
||||
.path("/2")
|
||||
.timeout(1000, TimeUnit.MILLISECONDS)
|
||||
.send());
|
||||
|
||||
// Third request should succeed.
|
||||
CountDownLatch response3Latch = new CountDownLatch(1);
|
||||
client.newRequest("localhost", connector.getLocalPort())
|
||||
.path("/3")
|
||||
.send(result ->
|
||||
{
|
||||
assertTrue(result.isSucceeded(), String.valueOf(result.getFailure()));
|
||||
assertEquals(HttpStatus.OK_200, result.getResponse().getStatus());
|
||||
response3Latch.countDown();
|
||||
});
|
||||
|
||||
// Wait for the third request to generate the HTTP/2 stream.
|
||||
Thread.sleep(1000);
|
||||
|
||||
// Resolve the TCP congestion.
|
||||
request1Latch.countDown();
|
||||
|
||||
assertTrue(response1Latch.await(15, TimeUnit.SECONDS));
|
||||
assertTrue(response3Latch.await(5, TimeUnit.SECONDS));
|
||||
}
|
||||
|
||||
private void primeConnection() throws Exception
|
||||
{
|
||||
// Prime the connection so that the maxConcurrentStream setting arrives to the client.
|
||||
|
@ -438,6 +565,18 @@ public class MaxConcurrentStreamsTest extends AbstractTest
|
|||
}
|
||||
}
|
||||
|
||||
private boolean awaitLatch(CountDownLatch latch, long time, TimeUnit unit)
|
||||
{
|
||||
try
|
||||
{
|
||||
return latch.await(time, unit);
|
||||
}
|
||||
catch (InterruptedException x)
|
||||
{
|
||||
throw new RuntimeException(x);
|
||||
}
|
||||
}
|
||||
|
||||
private static class Wrapper implements Session.Listener
|
||||
{
|
||||
private final Session.Listener listener;
|
||||
|
|
Loading…
Reference in New Issue