Fixes #5217 - Review RoundRobinConnectionPool (#5219)

* Fixes #5217 - Review RoundRobinConnectionPool

Introduced IndexedConnectionPool and RandomConnectionPool.
Clarified semantic of RoundRobinConnectionPool.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2020-09-09 15:31:28 +02:00 committed by GitHub
parent 7ecf42e3f8
commit 01135e1515
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 229 additions and 63 deletions

View File

@ -48,6 +48,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
private final HttpDestination destination;
private final Callback requester;
private final Pool<Connection> pool;
private boolean maximizeConnections;
/**
* @deprecated use {@link #AbstractConnectionPool(HttpDestination, int, boolean, Callback)} instead
@ -151,11 +152,28 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
}
@Override
@ManagedAttribute("Whether this pool is closed")
public boolean isClosed()
{
return pool.isClosed();
}
@ManagedAttribute("Whether the pool tries to maximize the number of connections used")
public boolean isMaximizeConnections()
{
return maximizeConnections;
}
/**
* <p>Sets whether the number of connections should be maximized.</p>
*
* @param maximizeConnections whether the number of connections should be maximized
*/
public void setMaximizeConnections(boolean maximizeConnections)
{
this.maximizeConnections = maximizeConnections;
}
@Override
public Connection acquire()
{
@ -164,7 +182,8 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
/**
* <p>Returns an idle connection, if available;
* if an idle connection is not available, and the given {@code create} parameter is {@code true},
* if an idle connection is not available, and the given {@code create} parameter is {@code true}
* or {@link #isMaximizeConnections()} is {@code true},
* then schedules the opening of a new connection, if possible within the configuration of this
* connection pool (for example, if it does not exceed the max connection count);
* otherwise returns {@code null}.</p>
@ -178,7 +197,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
if (LOG.isDebugEnabled())
LOG.debug("Acquiring create={} on {}", create, this);
Connection connection = activate();
if (connection == null && create)
if (connection == null && (create || isMaximizeConnections()))
{
tryCreate(destination.getQueuedRequestCount());
connection = activate();
@ -357,8 +376,8 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
}
/**
* @deprecated Relying on this method indicates a reliance on the implementation details.
* @return an unmodifiable queue working as a view of the idle connections.
* @deprecated Relying on this method indicates a reliance on the implementation details.
*/
@Deprecated
public Queue<Connection> getIdleConnections()
@ -371,8 +390,8 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
}
/**
* @deprecated Relying on this method indicates a reliance on the implementation details.
* @return an unmodifiable collection working as a view of the active connections.
* @deprecated Relying on this method indicates a reliance on the implementation details.
*/
@Deprecated
public Collection<Connection> getActiveConnections()

View File

@ -0,0 +1,79 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
/**
* <p>A {@link MultiplexConnectionPool} that picks connections at a particular
* index between {@code 0} and {@link #getMaxConnectionCount()}.</p>
* <p>The algorithm that decides the index value is decided by subclasses.</p>
* <p>To acquire a connection, this class obtains the index value and attempts
* to activate the pool entry at that index.
* If this activation fails, another attempt to activate an alternative pool
* entry is performed, to avoid stalling connection acquisition if there is
* an available entry at a different index.</p>
*/
@ManagedObject
public abstract class IndexedConnectionPool extends MultiplexConnectionPool
{
private static final Logger LOG = Log.getLogger(IndexedConnectionPool.class);
private final Pool<Connection> pool;
public IndexedConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, false, requester, maxMultiplex);
pool = destination.getBean(Pool.class);
}
/**
* <p>Must return an index between 0 (inclusive) and {@code maxConnections} (exclusive)
* used to attempt to acquire the connection at that index in the pool.</p>
*
* @param maxConnections the upper bound of the index (exclusive)
* @return an index between 0 (inclusive) and {@code maxConnections} (exclusive)
*/
protected abstract int getIndex(int maxConnections);
@Override
protected Connection activate()
{
int index = getIndex(getMaxConnectionCount());
Pool<Connection>.Entry entry = pool.acquireAt(index);
if (LOG.isDebugEnabled())
LOG.debug("activating at index={} entry={}", index, entry);
if (entry == null)
{
entry = pool.acquire();
if (LOG.isDebugEnabled())
LOG.debug("activating alternative entry={}", entry);
}
if (entry == null)
return null;
Connection connection = entry.getPooled();
acquired(connection);
return connection;
}
}

View File

@ -0,0 +1,43 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
// ------------------------------------------------------------------------
// All rights reserved. This program and the accompanying materials
// are made available under the terms of the Eclipse Public License v1.0
// and Apache License v2.0 which accompanies this distribution.
//
// The Eclipse Public License is available at
// http://www.eclipse.org/legal/epl-v10.html
//
// The Apache License v2.0 is available at
// http://www.opensource.org/licenses/apache2.0.php
//
// You may elect to redistribute this code under either of these licenses.
// ========================================================================
//
package org.eclipse.jetty.client;
import java.util.concurrent.ThreadLocalRandom;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.annotation.ManagedObject;
/**
* <p>An indexed {@link ConnectionPool} that provides connections
* randomly among the ones that are available.</p>
*/
@ManagedObject
public class RandomConnectionPool extends IndexedConnectionPool
{
public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, requester, maxMultiplex);
}
@Override
protected int getIndex(int maxConnections)
{
return ThreadLocalRandom.current().nextInt(maxConnections);
}
}

View File

@ -18,22 +18,39 @@
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Connection;
import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.log.Log;
import org.eclipse.jetty.util.log.Logger;
import org.eclipse.jetty.util.thread.Locker;
/**
* <p>A {@link ConnectionPool} that attempts to provide connections using a round-robin algorithm.</p>
* <p>The round-robin behavior is almost impossible to achieve for several reasons:</p>
* <ul>
* <li>the server takes different times to serve different requests; if a request takes a long
* time to be processed by the server, it would be a performance penalty to stall sending requests
* waiting for that connection to be available - better skip it and try another connection</li>
* <li>connections may be closed by the client or by the server, so it should be a performance
* penalty to stall sending requests waiting for a new connection to be opened</li>
* <li>thread scheduling on both client and server may temporarily penalize a connection</li>
* </ul>
* <p>Do not expect this class to provide connections in a perfect recurring sequence such as
* {@code c0, c1, ..., cN-1, c0, c1, ..., cN-1, c0, c1, ...} because that is impossible to
* achieve in a real environment.
* This class will just attempt a best-effort to provide the connections in a sequential order,
* but most likely the order will be quasi-random.</p>
* <p>Applications using this class should {@link #preCreateConnections(int) pre-create}
* the connections to ensure that they are already opened when the application starts to requests
* them, otherwise the first connection that is opened may be used multiple times before the others
* are opened, resulting in a behavior that is more random-like than more round-robin-like (and
* that confirms that round-robin behavior is almost impossible to achieve).</p>
*
* @see RandomConnectionPool
*/
@ManagedObject
public class RoundRobinConnectionPool extends MultiplexConnectionPool
public class RoundRobinConnectionPool extends IndexedConnectionPool
{
private static final Logger LOG = Log.getLogger(RoundRobinConnectionPool.class);
private final Locker lock = new Locker();
private final Pool<Connection> pool;
private int offset;
private final AtomicInteger offset = new AtomicInteger();
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{
@ -42,37 +59,17 @@ public class RoundRobinConnectionPool extends MultiplexConnectionPool
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, false, requester, maxMultiplex);
pool = destination.getBean(Pool.class);
}
@Override
protected Connection acquire(boolean create)
{
super(destination, maxConnections, requester, maxMultiplex);
// If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace
// those that were closed to process queued requests.
return super.acquire(true);
setMaximizeConnections(true);
}
@Override
protected Connection activate()
protected int getIndex(int maxConnections)
{
Pool<Connection>.Entry entry;
try (Locker.Lock l = lock.lock())
{
int index = Math.abs(offset % pool.getMaxEntries());
entry = pool.acquireAt(index);
if (LOG.isDebugEnabled())
LOG.debug("activated at index={} entry={}", index, entry);
if (entry != null)
++offset;
}
if (entry == null)
return null;
Connection connection = entry.getPooled();
acquired(connection);
return connection;
return offset.getAndUpdate(v -> ++v == maxConnections ? 0 : v);
}
}

View File

@ -71,7 +71,8 @@ public class ConnectionPoolTest
{
return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)),
new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
);
}

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.http.client;
import java.io.IOException;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier;
@ -42,6 +43,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
@ -71,17 +73,21 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
});
int maxConnections = 3;
scenario.client.getTransport().setConnectionPoolFactory(destination -> new RoundRobinConnectionPool(destination, maxConnections, destination));
// Prime the connections, so that they are all opened
// before we actually test the round robin behavior.
for (int i = 0; i < maxConnections; ++i)
CompletableFuture<Void> setup = new CompletableFuture<>();
scenario.client.getTransport().setConnectionPoolFactory(destination ->
{
ContentResponse response = scenario.client.newRequest(scenario.newURI())
.timeout(5, TimeUnit.SECONDS)
.send();
assertEquals(HttpStatus.OK_200, response.getStatus());
}
RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination);
pool.preCreateConnections(maxConnections).handle((r, x) -> x != null ? setup.completeExceptionally(x) : setup.complete(null));
return pool;
});
// Send one request to trigger destination creation
// and connection pool pre-creation of connections,
// so we can test reliably the round-robin behavior.
scenario.client.newRequest(scenario.newURI())
.timeout(5, TimeUnit.SECONDS)
.send();
setup.get(5, TimeUnit.SECONDS);
record.set(true);
int requests = 2 * maxConnections - 1;
@ -118,6 +124,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
int maxConnections = 3;
int count = maxConnections * maxMultiplex;
AtomicBoolean record = new AtomicBoolean();
List<Integer> remotePorts = new CopyOnWriteArrayList<>();
AtomicReference<CountDownLatch> requestLatch = new AtomicReference<>();
CountDownLatch serverLatch = new CountDownLatch(count);
@ -129,10 +136,13 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
{
try
{
remotePorts.add(request.getRemotePort());
requestLatch.get().countDown();
serverLatch.countDown();
barrier.await();
if (record.get())
{
remotePorts.add(request.getRemotePort());
requestLatch.get().countDown();
serverLatch.countDown();
barrier.await();
}
}
catch (Exception x)
{
@ -141,11 +151,23 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
}
});
scenario.client.getTransport().setConnectionPoolFactory(destination -> new RoundRobinConnectionPool(destination, maxConnections, destination, maxMultiplex));
CompletableFuture<Void> setup = new CompletableFuture<>();
scenario.client.getTransport().setConnectionPoolFactory(destination ->
{
RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination);
pool.preCreateConnections(maxConnections).handle((r, x) -> x != null ? setup.completeExceptionally(x) : setup.complete(null));
return pool;
});
// Do not prime the connections, to see if the behavior is
// correct even if the connections are not pre-created.
// Send one request to trigger destination creation
// and connection pool pre-creation of connections,
// so we can test reliably the round-robin behavior.
scenario.client.newRequest(scenario.newURI())
.timeout(5, TimeUnit.SECONDS)
.send();
setup.get(5, TimeUnit.SECONDS);
record.set(true);
CountDownLatch clientLatch = new CountDownLatch(count);
AtomicInteger requests = new AtomicInteger();
for (int i = 0; i < count; ++i)
@ -171,7 +193,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
barrier.await();
assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
assertThat(remotePorts.size(), Matchers.equalTo(count));
assertThat(remotePorts.toString(), remotePorts.size(), Matchers.equalTo(count));
for (int i = 0; i < count; ++i)
{
int base = i % maxConnections;
@ -194,9 +216,9 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
multiplex = 2;
int maxMultiplex = multiplex;
int maxUsage = 2;
int maxConnections = 2;
int count = maxConnections * maxMultiplex * maxUsage;
int maxUsage = 3;
int maxConnections = 4;
int count = 2 * maxConnections * maxMultiplex * maxUsage;
List<Integer> remotePorts = new CopyOnWriteArrayList<>();
scenario.start(new EmptyServerHandler()
@ -229,9 +251,14 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
assertTrue(clientLatch.await(count, TimeUnit.SECONDS));
assertEquals(count, remotePorts.size());
// Maps {remote_port -> number_of_times_port_was_used}.
Map<Integer, Long> results = remotePorts.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
assertEquals(count / maxUsage, results.size(), remotePorts.toString());
assertEquals(1, results.values().stream().distinct().count(), remotePorts.toString());
// RoundRobinConnectionPool may open more connections than expected.
// For example with maxUsage=2, requests could be sent to these ports:
// [p1, p2, p3 | p1, p2, p3 | p4, p4, p5 | p6, p5, p7]
// Opening p5 and p6 was delayed, so the opening of p7 was triggered
// to replace p4 while p5 and p6 were busy sending their requests.
assertThat(remotePorts.toString(), count / maxUsage, lessThanOrEqualTo(results.size()));
}
}