diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java index 6ec1ee47425..4832590f7cb 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/AbstractConnectionPool.java @@ -48,6 +48,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable private final HttpDestination destination; private final Callback requester; private final Pool pool; + private boolean maximizeConnections; /** * @deprecated use {@link #AbstractConnectionPool(HttpDestination, int, boolean, Callback)} instead @@ -151,11 +152,28 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable } @Override + @ManagedAttribute("Whether this pool is closed") public boolean isClosed() { return pool.isClosed(); } + @ManagedAttribute("Whether the pool tries to maximize the number of connections used") + public boolean isMaximizeConnections() + { + return maximizeConnections; + } + + /** + *

Sets whether the number of connections should be maximized.

+ * + * @param maximizeConnections whether the number of connections should be maximized + */ + public void setMaximizeConnections(boolean maximizeConnections) + { + this.maximizeConnections = maximizeConnections; + } + @Override public Connection acquire() { @@ -164,7 +182,8 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable /** *

Returns an idle connection, if available; - * if an idle connection is not available, and the given {@code create} parameter is {@code true}, + * if an idle connection is not available, and the given {@code create} parameter is {@code true} + * or {@link #isMaximizeConnections()} is {@code true}, * then schedules the opening of a new connection, if possible within the configuration of this * connection pool (for example, if it does not exceed the max connection count); * otherwise returns {@code null}.

@@ -178,7 +197,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable if (LOG.isDebugEnabled()) LOG.debug("Acquiring create={} on {}", create, this); Connection connection = activate(); - if (connection == null && create) + if (connection == null && (create || isMaximizeConnections())) { tryCreate(destination.getQueuedRequestCount()); connection = activate(); @@ -357,8 +376,8 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable } /** - * @deprecated Relying on this method indicates a reliance on the implementation details. * @return an unmodifiable queue working as a view of the idle connections. + * @deprecated Relying on this method indicates a reliance on the implementation details. */ @Deprecated public Queue getIdleConnections() @@ -371,8 +390,8 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable } /** - * @deprecated Relying on this method indicates a reliance on the implementation details. * @return an unmodifiable collection working as a view of the active connections. + * @deprecated Relying on this method indicates a reliance on the implementation details. */ @Deprecated public Collection getActiveConnections() diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/IndexedConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/IndexedConnectionPool.java new file mode 100644 index 00000000000..587a7934585 --- /dev/null +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/IndexedConnectionPool.java @@ -0,0 +1,79 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import org.eclipse.jetty.client.api.Connection; +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.Pool; +import org.eclipse.jetty.util.annotation.ManagedObject; +import org.eclipse.jetty.util.log.Log; +import org.eclipse.jetty.util.log.Logger; + +/** + *

A {@link MultiplexConnectionPool} that picks connections at a particular + * index between {@code 0} and {@link #getMaxConnectionCount()}.

+ *

The algorithm that decides the index value is decided by subclasses.

+ *

To acquire a connection, this class obtains the index value and attempts + * to activate the pool entry at that index. + * If this activation fails, another attempt to activate an alternative pool + * entry is performed, to avoid stalling connection acquisition if there is + * an available entry at a different index.

+ */ +@ManagedObject +public abstract class IndexedConnectionPool extends MultiplexConnectionPool +{ + private static final Logger LOG = Log.getLogger(IndexedConnectionPool.class); + + private final Pool pool; + + public IndexedConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) + { + super(destination, maxConnections, false, requester, maxMultiplex); + pool = destination.getBean(Pool.class); + } + + /** + *

Must return an index between 0 (inclusive) and {@code maxConnections} (exclusive) + * used to attempt to acquire the connection at that index in the pool.

+ * + * @param maxConnections the upper bound of the index (exclusive) + * @return an index between 0 (inclusive) and {@code maxConnections} (exclusive) + */ + protected abstract int getIndex(int maxConnections); + + @Override + protected Connection activate() + { + int index = getIndex(getMaxConnectionCount()); + Pool.Entry entry = pool.acquireAt(index); + if (LOG.isDebugEnabled()) + LOG.debug("activating at index={} entry={}", index, entry); + if (entry == null) + { + entry = pool.acquire(); + if (LOG.isDebugEnabled()) + LOG.debug("activating alternative entry={}", entry); + } + if (entry == null) + return null; + Connection connection = entry.getPooled(); + acquired(connection); + return connection; + } +} diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java new file mode 100644 index 00000000000..6eb44042a90 --- /dev/null +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RandomConnectionPool.java @@ -0,0 +1,43 @@ +// +// ======================================================================== +// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others. +// ------------------------------------------------------------------------ +// All rights reserved. This program and the accompanying materials +// are made available under the terms of the Eclipse Public License v1.0 +// and Apache License v2.0 which accompanies this distribution. +// +// The Eclipse Public License is available at +// http://www.eclipse.org/legal/epl-v10.html +// +// The Apache License v2.0 is available at +// http://www.opensource.org/licenses/apache2.0.php +// +// You may elect to redistribute this code under either of these licenses. +// ======================================================================== +// + +package org.eclipse.jetty.client; + +import java.util.concurrent.ThreadLocalRandom; + +import org.eclipse.jetty.util.Callback; +import org.eclipse.jetty.util.annotation.ManagedObject; + +/** + *

An indexed {@link ConnectionPool} that provides connections + * randomly among the ones that are available.

+ */ +@ManagedObject +public class RandomConnectionPool extends IndexedConnectionPool +{ + public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) + { + super(destination, maxConnections, requester, maxMultiplex); + } + + @Override + protected int getIndex(int maxConnections) + { + return ThreadLocalRandom.current().nextInt(maxConnections); + } +} diff --git a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java index 7b909f01d10..d9cb2c590e5 100644 --- a/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java +++ b/jetty-client/src/main/java/org/eclipse/jetty/client/RoundRobinConnectionPool.java @@ -18,22 +18,39 @@ package org.eclipse.jetty.client; -import org.eclipse.jetty.client.api.Connection; +import java.util.concurrent.atomic.AtomicInteger; + import org.eclipse.jetty.util.Callback; -import org.eclipse.jetty.util.Pool; import org.eclipse.jetty.util.annotation.ManagedObject; -import org.eclipse.jetty.util.log.Log; -import org.eclipse.jetty.util.log.Logger; -import org.eclipse.jetty.util.thread.Locker; +/** + *

A {@link ConnectionPool} that attempts to provide connections using a round-robin algorithm.

+ *

The round-robin behavior is almost impossible to achieve for several reasons:

+ *
    + *
  • the server takes different times to serve different requests; if a request takes a long + * time to be processed by the server, it would be a performance penalty to stall sending requests + * waiting for that connection to be available - better skip it and try another connection
  • + *
  • connections may be closed by the client or by the server, so it should be a performance + * penalty to stall sending requests waiting for a new connection to be opened
  • + *
  • thread scheduling on both client and server may temporarily penalize a connection
  • + *
+ *

Do not expect this class to provide connections in a perfect recurring sequence such as + * {@code c0, c1, ..., cN-1, c0, c1, ..., cN-1, c0, c1, ...} because that is impossible to + * achieve in a real environment. + * This class will just attempt a best-effort to provide the connections in a sequential order, + * but most likely the order will be quasi-random.

+ *

Applications using this class should {@link #preCreateConnections(int) pre-create} + * the connections to ensure that they are already opened when the application starts to requests + * them, otherwise the first connection that is opened may be used multiple times before the others + * are opened, resulting in a behavior that is more random-like than more round-robin-like (and + * that confirms that round-robin behavior is almost impossible to achieve).

+ * + * @see RandomConnectionPool + */ @ManagedObject -public class RoundRobinConnectionPool extends MultiplexConnectionPool +public class RoundRobinConnectionPool extends IndexedConnectionPool { - private static final Logger LOG = Log.getLogger(RoundRobinConnectionPool.class); - - private final Locker lock = new Locker(); - private final Pool pool; - private int offset; + private final AtomicInteger offset = new AtomicInteger(); public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester) { @@ -42,37 +59,17 @@ public class RoundRobinConnectionPool extends MultiplexConnectionPool public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) { - super(destination, maxConnections, false, requester, maxMultiplex); - pool = destination.getBean(Pool.class); - } - - @Override - protected Connection acquire(boolean create) - { + super(destination, maxConnections, requester, maxMultiplex); // If there are queued requests and connections get // closed due to idle timeout or overuse, we want to // aggressively try to open new connections to replace // those that were closed to process queued requests. - return super.acquire(true); + setMaximizeConnections(true); } @Override - protected Connection activate() + protected int getIndex(int maxConnections) { - Pool.Entry entry; - try (Locker.Lock l = lock.lock()) - { - int index = Math.abs(offset % pool.getMaxEntries()); - entry = pool.acquireAt(index); - if (LOG.isDebugEnabled()) - LOG.debug("activated at index={} entry={}", index, entry); - if (entry != null) - ++offset; - } - if (entry == null) - return null; - Connection connection = entry.getPooled(); - acquired(connection); - return connection; + return offset.getAndUpdate(v -> ++v == maxConnections ? 0 : v); } } diff --git a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java index 334b9b6a6d6..0a679a1b155 100644 --- a/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java +++ b/jetty-client/src/test/java/org/eclipse/jetty/client/ConnectionPoolTest.java @@ -71,7 +71,8 @@ public class ConnectionPoolTest { return Stream.of( new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)), - new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)) + new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)), + new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)) ); } diff --git a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java index ad63a8e2d29..5df851017e5 100644 --- a/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java +++ b/tests/test-http-client-transport/src/test/java/org/eclipse/jetty/http/client/RoundRobinConnectionPoolTest.java @@ -21,6 +21,7 @@ package org.eclipse.jetty.http.client; import java.io.IOException; import java.util.List; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CountDownLatch; import java.util.concurrent.CyclicBarrier; @@ -42,6 +43,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ArgumentsSource; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThanOrEqualTo; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -71,17 +73,21 @@ public class RoundRobinConnectionPoolTest extends AbstractTest new RoundRobinConnectionPool(destination, maxConnections, destination)); - - // Prime the connections, so that they are all opened - // before we actually test the round robin behavior. - for (int i = 0; i < maxConnections; ++i) + CompletableFuture setup = new CompletableFuture<>(); + scenario.client.getTransport().setConnectionPoolFactory(destination -> { - ContentResponse response = scenario.client.newRequest(scenario.newURI()) - .timeout(5, TimeUnit.SECONDS) - .send(); - assertEquals(HttpStatus.OK_200, response.getStatus()); - } + RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination); + pool.preCreateConnections(maxConnections).handle((r, x) -> x != null ? setup.completeExceptionally(x) : setup.complete(null)); + return pool; + }); + + // Send one request to trigger destination creation + // and connection pool pre-creation of connections, + // so we can test reliably the round-robin behavior. + scenario.client.newRequest(scenario.newURI()) + .timeout(5, TimeUnit.SECONDS) + .send(); + setup.get(5, TimeUnit.SECONDS); record.set(true); int requests = 2 * maxConnections - 1; @@ -118,6 +124,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest remotePorts = new CopyOnWriteArrayList<>(); AtomicReference requestLatch = new AtomicReference<>(); CountDownLatch serverLatch = new CountDownLatch(count); @@ -129,10 +136,13 @@ public class RoundRobinConnectionPoolTest extends AbstractTest new RoundRobinConnectionPool(destination, maxConnections, destination, maxMultiplex)); + CompletableFuture setup = new CompletableFuture<>(); + scenario.client.getTransport().setConnectionPoolFactory(destination -> + { + RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination); + pool.preCreateConnections(maxConnections).handle((r, x) -> x != null ? setup.completeExceptionally(x) : setup.complete(null)); + return pool; + }); - // Do not prime the connections, to see if the behavior is - // correct even if the connections are not pre-created. + // Send one request to trigger destination creation + // and connection pool pre-creation of connections, + // so we can test reliably the round-robin behavior. + scenario.client.newRequest(scenario.newURI()) + .timeout(5, TimeUnit.SECONDS) + .send(); + setup.get(5, TimeUnit.SECONDS); + record.set(true); CountDownLatch clientLatch = new CountDownLatch(count); AtomicInteger requests = new AtomicInteger(); for (int i = 0; i < count; ++i) @@ -171,7 +193,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest remotePorts = new CopyOnWriteArrayList<>(); scenario.start(new EmptyServerHandler() @@ -229,9 +251,14 @@ public class RoundRobinConnectionPoolTest extends AbstractTest number_of_times_port_was_used}. Map results = remotePorts.stream() .collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); - assertEquals(count / maxUsage, results.size(), remotePorts.toString()); - assertEquals(1, results.values().stream().distinct().count(), remotePorts.toString()); + // RoundRobinConnectionPool may open more connections than expected. + // For example with maxUsage=2, requests could be sent to these ports: + // [p1, p2, p3 | p1, p2, p3 | p4, p4, p5 | p6, p5, p7] + // Opening p5 and p6 was delayed, so the opening of p7 was triggered + // to replace p4 while p5 and p6 were busy sending their requests. + assertThat(remotePorts.toString(), count / maxUsage, lessThanOrEqualTo(results.size())); } }