From 3c019fabd38c73e5052e19f23cb59b4cd3ae8746 Mon Sep 17 00:00:00 2001 From: Ludovic Orban Date: Wed, 17 Nov 2021 16:32:01 +0100 Subject: [PATCH] #7107: fix too-eager closing of multiplexed connections marked as closed but still in use --- .../jetty/client/AbstractConnectionPool.java | 28 ++- .../jetty/client/ConnectionPoolTest.java | 45 ++-- .../http2/client/http/ConnectionPoolTest.java | 216 ++++++++++++++++++ 3 files changed, 262 insertions(+), 27 deletions(-) create mode 100644 jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/ConnectionPoolTest.java diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index e2a3b539563..36407cb9b4c 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -274,6 +274,8 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen if (entry == null) { pending.decrementAndGet(); + if (LOG.isDebugEnabled()) + LOG.debug("Not creating connection as pool is full, pending: {}", pending); return; } @@ -321,7 +323,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen EntryHolder holder = (EntryHolder)((Attachable)connection).getAttachment(); if (holder.isExpired(maxDurationNanos)) { - boolean canClose = remove(connection, true); + boolean canClose = remove(connection); if (canClose) IO.close(connection); if (LOG.isDebugEnabled()) @@ -368,13 +370,23 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen EntryHolder holder = (EntryHolder)attachable.getAttachment(); if (holder == null) return true; - boolean reusable = pool.release(holder.entry); - if (LOG.isDebugEnabled()) - LOG.debug("Released ({}) {} {}", reusable, holder.entry, pool); - if (reusable) - return true; - remove(connection); - return false; + + long maxDurationNanos = this.maxDurationNanos; + if (maxDurationNanos > 0L && holder.isExpired(maxDurationNanos)) + { + // Remove instead of release if the connection expired. + return !remove(connection); + } + else + { + // Release if the connection has not expired, then remove if not reusable. + boolean reusable = pool.release(holder.entry); + if (LOG.isDebugEnabled()) + LOG.debug("Released ({}) {} {}", reusable, holder.entry, pool); + if (reusable) + return true; + return !remove(connection); + } } @Override diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java index 93bb1702fd1..fef67212c73 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java @@ -59,31 +59,36 @@ import static org.junit.jupiter.api.Assertions.assertTrue; public class ConnectionPoolTest { - private Server server; - private ServerConnector connector; - private HttpClient client; + private static final ConnectionPoolFactory DUPLEX = new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)); + private static final ConnectionPoolFactory MULTIPLEX = new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)); + private static final ConnectionPoolFactory RANDOM = new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)); + private static final ConnectionPoolFactory DUPLEX_MAX_DURATION = new ConnectionPoolFactory("duplex-maxDuration", destination -> + { + DuplexConnectionPool pool = new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination); + pool.setMaxDuration(10); + return pool; + }); + private static final ConnectionPoolFactory ROUND_ROBIN = new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)); public static Stream pools() { - return Stream.concat(poolsNoRoundRobin(), - Stream.of(new ConnectionPoolFactory("round-robin", destination -> new RoundRobinConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)))); + return Stream.of(DUPLEX, MULTIPLEX, RANDOM, DUPLEX_MAX_DURATION, ROUND_ROBIN); + } + + public static Stream poolsNoMaxDuration() + { + return Stream.of(DUPLEX, MULTIPLEX, RANDOM, ROUND_ROBIN); } public static Stream poolsNoRoundRobin() { - return Stream.of( - new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)), - new ConnectionPoolFactory("duplex-maxDuration", destination -> - { - DuplexConnectionPool pool = new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination); - pool.setMaxDuration(10); - return pool; - }), - new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)), - new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)) - ); + return Stream.of(DUPLEX, MULTIPLEX, RANDOM, DUPLEX_MAX_DURATION); } + private Server server; + private ServerConnector connector; + private HttpClient client; + private void start(ConnectionPool.Factory factory, Handler handler) throws Exception { startServer(handler); @@ -371,7 +376,8 @@ public class ConnectionPoolTest } @ParameterizedTest - @MethodSource("pools") + // Connection pool aggressively closes expired connections upon release, which interferes with this test's assertion. + @MethodSource("poolsNoMaxDuration") public void testConcurrentRequestsAllBlockedOnServerWithLargeConnectionPool(ConnectionPoolFactory factory) throws Exception { int count = 50; @@ -379,14 +385,15 @@ public class ConnectionPoolTest } @ParameterizedTest - @MethodSource("pools") + // Connection pool aggressively closes expired connections upon release, which interferes with this test's assertion. + @MethodSource("poolsNoMaxDuration") public void testConcurrentRequestsAllBlockedOnServerWithExactConnectionPool(ConnectionPoolFactory factory) throws Exception { int count = 50; testConcurrentRequestsAllBlockedOnServer(factory, count, count); } - private void testConcurrentRequestsAllBlockedOnServer(ConnectionPoolFactory factory, int count, int maxConnections) throws Exception + private void testConcurrentRequestsAllBlockedOnServer(ConnectionPoolFactory factory, int count, int maxConnections) throws Exception { CyclicBarrier barrier = new CyclicBarrier(count); diff --git a/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/ConnectionPoolTest.java b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/ConnectionPoolTest.java new file mode 100644 index 00000000000..f41fdbe675f --- /dev/null +++ b/jetty-http2/http2-http-client-transport/src/test/java/org/eclipse/jetty/http2/client/http/ConnectionPoolTest.java @@ -0,0 +1,216 @@ +// +// ======================================================================== +// Copyright (c) 1995-2021 Mort Bay Consulting Pty Ltd and others. +// +// This program and the accompanying materials are made available under the +// terms of the Eclipse Public License v. 2.0 which is available at +// https://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 +// which is available at https://www.apache.org/licenses/LICENSE-2.0. +// +// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 +// ======================================================================== +// + +package org.eclipse.jetty.http2.client.http; + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import javax.servlet.ServletException; +import javax.servlet.http.HttpServletRequest; +import javax.servlet.http.HttpServletResponse; + +import org.eclipse.jetty.client.ConnectionPool; +import org.eclipse.jetty.client.HttpClient; +import org.eclipse.jetty.client.HttpClientTransport; +import org.eclipse.jetty.client.MultiplexConnectionPool; +import org.eclipse.jetty.client.api.Connection; +import org.eclipse.jetty.http2.client.HTTP2Client; +import org.eclipse.jetty.http2.server.HTTP2CServerConnectionFactory; +import org.eclipse.jetty.io.ClientConnector; +import org.eclipse.jetty.server.Handler; +import org.eclipse.jetty.server.HttpConfiguration; +import org.eclipse.jetty.server.Server; +import org.eclipse.jetty.server.ServerConnector; +import org.eclipse.jetty.util.Pool; +import org.hamcrest.Matchers; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ConnectionPoolTest +{ + private static final Logger LOG = LoggerFactory.getLogger(ConnectionPoolTest.class); + + private Server server; + private ServerConnector connector; + private HttpClient client; + + private void startServer(Handler handler) throws Exception + { + server = new Server(); + connector = new ServerConnector(server); + HttpConfiguration httpConfig = new HttpConfiguration(); + HTTP2CServerConnectionFactory connectionFactory = new HTTP2CServerConnectionFactory(httpConfig); + connector.setConnectionFactories(List.of(connectionFactory)); + server.addConnector(connector); + server.setHandler(handler); + server.start(); + } + + @AfterEach + public void disposeServer() throws Exception + { + connector = null; + if (server != null) + { + server.stop(); + server = null; + } + } + + @AfterEach + public void disposeClient() throws Exception + { + if (client != null) + { + client.stop(); + client = null; + } + } + + @Test + public void testMaxDurationConnectionsWithMultiplexedPool() throws Exception + { + final int maxDuration = 200; + AtomicInteger poolCreateCounter = new AtomicInteger(); + AtomicInteger poolRemoveCounter = new AtomicInteger(); + ConnectionPoolFactory factory = new ConnectionPoolFactory("MaxDurationConnectionsWithMultiplexedPool", destination -> + { + MultiplexConnectionPool pool = new MultiplexConnectionPool(destination, Pool.StrategyType.FIRST, 2, false, destination, 10) + { + @Override + protected void onCreated(Connection connection) + { + poolCreateCounter.incrementAndGet(); + } + + @Override + protected void removed(Connection connection) + { + poolRemoveCounter.incrementAndGet(); + } + }; + pool.setMaxDuration(maxDuration); + return pool; + }); + + CountDownLatch[] reqExecutingLatches = new CountDownLatch[] {new CountDownLatch(1), new CountDownLatch(1), new CountDownLatch(1)}; + CountDownLatch[] reqExecutedLatches = new CountDownLatch[] {new CountDownLatch(1), new CountDownLatch(1), new CountDownLatch(1)}; + CountDownLatch[] reqFinishingLatches = new CountDownLatch[] {new CountDownLatch(1), new CountDownLatch(1), new CountDownLatch(1)}; + startServer(new EmptyServerHandler() + { + @Override + protected void service(String target, org.eclipse.jetty.server.Request jettyRequest, HttpServletRequest request, HttpServletResponse response) throws ServletException + { + int req = Integer.parseInt(target.substring(1)); + try + { + LOG.debug("req {} is executing", req); + reqExecutingLatches[req].countDown(); + Thread.sleep(250); + reqExecutedLatches[req].countDown(); + LOG.debug("req {} executed", req); + + assertTrue(reqFinishingLatches[req].await(5, TimeUnit.SECONDS)); + + response.getWriter().println("req " + req + " executed"); + response.getWriter().flush(); + LOG.debug("req {} successful", req); + } + catch (Exception e) + { + throw new ServletException(e); + } + } + }); + + ClientConnector clientConnector = new ClientConnector(); + HttpClientTransport transport = new HttpClientTransportOverHTTP2(new HTTP2Client(clientConnector)); + transport.setConnectionPoolFactory(factory.factory); + client = new HttpClient(transport); + client.start(); + + CountDownLatch[] reqClientDoneLatches = new CountDownLatch[] {new CountDownLatch(1), new CountDownLatch(1), new CountDownLatch(1)}; + + sendRequest(reqClientDoneLatches, 0); + // wait until handler is executing + assertTrue(reqExecutingLatches[0].await(5, TimeUnit.SECONDS)); + LOG.debug("req 0 executing"); + + sendRequest(reqClientDoneLatches, 1); + // wait until handler executed sleep + assertTrue(reqExecutedLatches[1].await(5, TimeUnit.SECONDS)); + LOG.debug("req 1 executed"); + + // Now the pool contains one connection that is expired but in use by 2 threads. + + sendRequest(reqClientDoneLatches, 2); + LOG.debug("req2 sent"); + assertTrue(reqExecutingLatches[2].await(5, TimeUnit.SECONDS)); + LOG.debug("req2 executing"); + + // The 3rd request has tried the expired request and marked it as closed as it has expired, then used a 2nd one. + + // release and wait for req2 to be done before releasing req1 + reqFinishingLatches[2].countDown(); + assertTrue(reqClientDoneLatches[2].await(5, TimeUnit.SECONDS)); + reqFinishingLatches[1].countDown(); + + // release req0 once req1 is done; req 1 should not have closed the response as req 0 is still running + assertTrue(reqClientDoneLatches[1].await(5, TimeUnit.SECONDS)); + reqFinishingLatches[0].countDown(); + assertTrue(reqClientDoneLatches[0].await(5, TimeUnit.SECONDS)); + + // Check that the pool created 2 and removed 2 connections; + // 2 were removed b/c waiting for req 2 means the 2nd connection + // expired and has to be removed and closed upon being returned to the pool. + assertThat(poolCreateCounter.get(), Matchers.is(2)); + assertThat(poolRemoveCounter.get(), Matchers.is(2)); + } + + private void sendRequest(CountDownLatch[] reqClientDoneLatches, int i) + { + client.newRequest("localhost", connector.getLocalPort()) + .path("/" + i) + .timeout(5, TimeUnit.SECONDS) + .send(result -> + { + assertThat("req " + i + " failed", result.getResponse().getStatus(), Matchers.is(200)); + reqClientDoneLatches[i].countDown(); + }); + } + + private static class ConnectionPoolFactory + { + private final String name; + private final ConnectionPool.Factory factory; + + private ConnectionPoolFactory(String name, ConnectionPool.Factory factory) + { + this.name = name; + this.factory = factory; + } + + @Override + public String toString() + { + return name; + } + } +}