From 08724d8e705cb7f74131625302b392da04349a3a Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Tue, 23 Nov 2021 11:47:12 +0100 Subject: [PATCH] #7157 add missing callback calls in H2 reset codepath --- .../org/eclipse/jetty/http2/HTTP2Stream.java | 21 +++- .../client/http/HttpChannelOverHTTP2.java | 9 +- .../http/MultiplexedConnectionPoolTest.java | 99 ++++++++++++++++++- 3 files changed, 122 insertions(+), 7 deletions(-) diff --git a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java index a7783f57f7b..c60adf386ab 100644 --- a/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java +++ b/jetty-http2/http2-common/src/main/java/org/eclipse/jetty/http2/HTTP2Stream.java @@ -152,14 +152,23 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts. @Override public void reset(ResetFrame frame, Callback callback) { + Throwable resetFailure = null; try (AutoLock l = lock.lock()) { if (isReset()) - return; - localReset = true; - failure = new EOFException("reset"); + { + resetFailure = failure; + } + else + { + localReset = true; + failure = new EOFException("reset"); + } } - ((HTTP2Session)session).reset(this, frame, callback); + if (resetFailure != null) + callback.failed(resetFailure); + else + ((HTTP2Session)session).reset(this, frame, callback); } private boolean startWrite(Callback callback) @@ -541,6 +550,8 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts. close(); if (session.removeStream(this)) notifyReset(this, frame, callback); + else + callback.succeeded(); } private void onPush(PushPromiseFrame frame, Callback callback) @@ -565,6 +576,8 @@ public class HTTP2Stream implements IStream, Callback, Dumpable, CyclicTimeouts. close(); if (session.removeStream(this)) notifyFailure(this, frame, callback); + else + callback.succeeded(); } @Override diff --git a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java index 3bc0904be1e..91537279920 100644 --- a/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java +++ b/jetty-http2/http2-http-client-transport/src/main/java/org/eclipse/jetty/http2/client/http/HttpChannelOverHTTP2.java @@ -106,7 +106,10 @@ public class HttpChannelOverHTTP2 extends HttpChannel public void release() { setStream(null); - if (connection.release(this)) + boolean released = connection.release(this); + if (LOG.isDebugEnabled()) + LOG.debug("released channel? {} {}", released, this); + if (released) getHttpDestination().release(getHttpConnection()); } @@ -114,13 +117,15 @@ public class HttpChannelOverHTTP2 extends HttpChannel public void exchangeTerminated(HttpExchange exchange, Result result) { super.exchangeTerminated(exchange, result); + Stream stream = getStream(); + if (LOG.isDebugEnabled()) + LOG.debug("exchange terminated {} {}", result, stream); if (result.isSucceeded()) { release(); } else { - Stream stream = getStream(); if (stream != null) stream.reset(new ResetFrame(stream.getId(), ErrorCode.CANCEL_STREAM_ERROR.code), new ReleaseCallback()); else diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java index a3a86948cfb..ec3fff6a03d 100644 --- a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/MultiplexedConnectionPoolTest.java @@ -13,6 +13,9 @@ package org.eclipse.jetty.http2.client.http; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; @@ -43,6 +46,7 @@ import org.junit.jupiter.api.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.awaitility.Awaitility.await; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.is; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -58,10 +62,17 @@ public class MultiplexedConnectionPoolTest private HttpClient client; private void startServer(Handler handler) throws Exception + { + startServer(handler, MAX_MULTIPLEX, -1L); + } + + private void startServer(Handler handler, int maxConcurrentStreams, long streamIdleTimeout) throws Exception { server = new Server(); HTTP2ServerConnectionFactory http2ServerConnectionFactory = new HTTP2ServerConnectionFactory(new HttpConfiguration()); - http2ServerConnectionFactory.setMaxConcurrentStreams(MAX_MULTIPLEX); + http2ServerConnectionFactory.setMaxConcurrentStreams(maxConcurrentStreams); + if (streamIdleTimeout > 0) + http2ServerConnectionFactory.setStreamIdleTimeout(streamIdleTimeout); connector = new ServerConnector(server, 1, 1, http2ServerConnectionFactory); server.addConnector(connector); server.setHandler(handler); @@ -205,6 +216,92 @@ public class MultiplexedConnectionPoolTest }); } + @Test + public void testStreamIdleTimeout() throws Exception + { + AtomicInteger poolCreateCounter = new AtomicInteger(); + AtomicInteger poolRemoveCounter = new AtomicInteger(); + AtomicReference> poolRef = new AtomicReference<>(); + ConnectionPoolFactory factory = new ConnectionPoolFactory("StreamIdleTimeout", destination -> + { + int maxConnections = destination.getHttpClient().getMaxConnectionsPerDestination(); + MultiplexConnectionPool pool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, maxConnections, false, destination, 10) + { + @Override + protected void onCreated(Connection connection) + { + poolCreateCounter.incrementAndGet(); + } + + @Override + protected void removed(Connection connection) + { + poolRemoveCounter.incrementAndGet(); + } + }; + poolRef.set(pool.getBean(Pool.class)); + return pool; + }); + + startServer(new EmptyServerHandler() + { + @Override + protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException + { + int req = Integer.parseInt(target.substring(1)); + try + { + response.getWriter().println("req " + req + " executed"); + response.getWriter().flush(); + } + catch (Exception e) + { + throw new ServletException(e); + } + } + }, 64, 1L); + + ClientConnector clientConnector = new ClientConnector(); + HttpClientTransport transport = new HttpClientTransportOverHTTP2(new HTTP2Client(clientConnector)); + transport.setConnectionPoolFactory(factory.factory); + client = new HttpClient(transport); + client.start(); + + List> futures = new ArrayList<>(); + AtomicInteger counter = new AtomicInteger(); + for (int i = 0; i < 100; i++) + { + CompletableFuture cf = new CompletableFuture<>(); + client.newRequest("localhost", connector.getLocalPort()) + .path("/" + i) + .timeout(5, TimeUnit.SECONDS) + .send(result -> + { + counter.incrementAndGet(); + cf.complete(null); + }); + futures.add(cf); + } + + // Wait for all requests to complete. + for (CompletableFuture cf : futures) + { + cf.get(5, TimeUnit.SECONDS); + } + assertThat(counter.get(), is(100)); + + // All remaining pooled connections should be in IDLE state. + await().atMost(5, TimeUnit.SECONDS).until(() -> + { + for (Pool.Entry value : poolRef.get().values()) + { + if (!value.isIdle()) + return false; + } + return true; + }); + } + @Test public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception {