Merge branch 'jetty-9.4.x' into 'jetty-10.0.x'.

Signed-off-by: Simone Bordet <simone.bordet@gmail.com>
This commit is contained in:
Simone Bordet 2020-09-09 15:39:36 +02:00
commit 1fa2b091a2
6 changed files with 237 additions and 60 deletions

View File

@ -47,6 +47,7 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
private final HttpDestination destination; private final HttpDestination destination;
private final Callback requester; private final Callback requester;
private final Pool<Connection> pool; private final Pool<Connection> pool;
private boolean maximizeConnections;
protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester) protected AbstractConnectionPool(HttpDestination destination, int maxConnections, boolean cache, Callback requester)
{ {
@ -130,18 +131,47 @@ public abstract class AbstractConnectionPool implements ConnectionPool, Dumpable
} }
@Override @Override
@ManagedAttribute("Whether this pool is closed")
public boolean isClosed() public boolean isClosed()
{ {
return pool.isClosed(); return pool.isClosed();
} }
@ManagedAttribute("Whether the pool tries to maximize the number of connections used")
public boolean isMaximizeConnections()
{
return maximizeConnections;
}
/**
* <p>Sets whether the number of connections should be maximized.</p>
*
* @param maximizeConnections whether the number of connections should be maximized
*/
public void setMaximizeConnections(boolean maximizeConnections)
{
this.maximizeConnections = maximizeConnections;
}
/**
* <p>Returns an idle connection, if available;
* if an idle connection is not available, and the given {@code create} parameter is {@code true}
* or {@link #isMaximizeConnections()} is {@code true},
* then schedules the opening of a new connection, if possible within the configuration of this
* connection pool (for example, if it does not exceed the max connection count);
* otherwise returns {@code null}.</p>
*
* @param create whether to schedule the opening of a connection if no idle connections are available
* @return an idle connection or {@code null} if no idle connections are available
* @see #tryCreate(int)
*/
@Override @Override
public Connection acquire(boolean create) public Connection acquire(boolean create)
{ {
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Acquiring create={} on {}", create, this); LOG.debug("Acquiring create={} on {}", create, this);
Connection connection = activate(); Connection connection = activate();
if (connection == null && create) if (connection == null && (create || isMaximizeConnections()))
{ {
tryCreate(destination.getQueuedRequestCount()); tryCreate(destination.getQueuedRequestCount());
connection = activate(); connection = activate();

View File

@ -0,0 +1,79 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Connection;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>A {@link MultiplexConnectionPool} that picks connections at a particular
* index between {@code 0} and {@link #getMaxConnectionCount()}.</p>
* <p>The algorithm that decides the index value is decided by subclasses.</p>
* <p>To acquire a connection, this class obtains the index value and attempts
* to activate the pool entry at that index.
* If this activation fails, another attempt to activate an alternative pool
* entry is performed, to avoid stalling connection acquisition if there is
* an available entry at a different index.</p>
*/
@ManagedObject
public abstract class IndexedConnectionPool extends MultiplexConnectionPool
{
private static final Logger LOG = LoggerFactory.getLogger(IndexedConnectionPool.class);
private final Pool<Connection> pool;
public IndexedConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, false, requester, maxMultiplex);
pool = destination.getBean(Pool.class);
}
/**
* <p>Must return an index between 0 (inclusive) and {@code maxConnections} (exclusive)
* used to attempt to acquire the connection at that index in the pool.</p>
*
* @param maxConnections the upper bound of the index (exclusive)
* @return an index between 0 (inclusive) and {@code maxConnections} (exclusive)
*/
protected abstract int getIndex(int maxConnections);
@Override
protected Connection activate()
{
int index = getIndex(getMaxConnectionCount());
Pool<Connection>.Entry entry = pool.acquireAt(index);
if (LOG.isDebugEnabled())
LOG.debug("activating at index={} entry={}", index, entry);
if (entry == null)
{
entry = pool.acquire();
if (LOG.isDebugEnabled())
LOG.debug("activating alternative entry={}", entry);
}
if (entry == null)
return null;
Connection connection = entry.getPooled();
acquired(connection);
return connection;
}
}

View File

@ -0,0 +1,43 @@
//
// ========================================================================
// Copyright (c) 1995-2020 Mort Bay Consulting Pty Ltd and others.
//
// This program and the accompanying materials are made available under
// the terms of the Eclipse Public License 2.0 which is available at
// https://www.eclipse.org/legal/epl-2.0
//
// This Source Code may also be made available under the following
// Secondary Licenses when the conditions for such availability set
// forth in the Eclipse Public License, v. 2.0 are satisfied:
// the Apache License v2.0 which is available at
// https://www.apache.org/licenses/LICENSE-2.0
//
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0
// ========================================================================
//
package org.eclipse.jetty.client;
import java.util.concurrent.ThreadLocalRandom;
import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.annotation.ManagedObject;
/**
* <p>An indexed {@link ConnectionPool} that provides connections
* randomly among the ones that are available.</p>
*/
@ManagedObject
public class RandomConnectionPool extends IndexedConnectionPool
{
public RandomConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{
super(destination, maxConnections, requester, maxMultiplex);
}
@Override
protected int getIndex(int maxConnections)
{
return ThreadLocalRandom.current().nextInt(maxConnections);
}
}

View File

@ -18,22 +18,39 @@
package org.eclipse.jetty.client; package org.eclipse.jetty.client;
import org.eclipse.jetty.client.api.Connection; import java.util.concurrent.atomic.AtomicInteger;
import org.eclipse.jetty.util.Callback; import org.eclipse.jetty.util.Callback;
import org.eclipse.jetty.util.Pool;
import org.eclipse.jetty.util.annotation.ManagedObject; import org.eclipse.jetty.util.annotation.ManagedObject;
import org.eclipse.jetty.util.thread.AutoLock;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>A {@link ConnectionPool} that attempts to provide connections using a round-robin algorithm.</p>
* <p>The round-robin behavior is almost impossible to achieve for several reasons:</p>
* <ul>
* <li>the server takes different times to serve different requests; if a request takes a long
* time to be processed by the server, it would be a performance penalty to stall sending requests
* waiting for that connection to be available - better skip it and try another connection</li>
* <li>connections may be closed by the client or by the server, so it should be a performance
* penalty to stall sending requests waiting for a new connection to be opened</li>
* <li>thread scheduling on both client and server may temporarily penalize a connection</li>
* </ul>
* <p>Do not expect this class to provide connections in a perfect recurring sequence such as
* {@code c0, c1, ..., cN-1, c0, c1, ..., cN-1, c0, c1, ...} because that is impossible to
* achieve in a real environment.
* This class will just attempt a best-effort to provide the connections in a sequential order,
* but most likely the order will be quasi-random.</p>
* <p>Applications using this class should {@link #preCreateConnections(int) pre-create}
* the connections to ensure that they are already opened when the application starts to requests
* them, otherwise the first connection that is opened may be used multiple times before the others
* are opened, resulting in a behavior that is more random-like than more round-robin-like (and
* that confirms that round-robin behavior is almost impossible to achieve).</p>
*
* @see RandomConnectionPool
*/
@ManagedObject @ManagedObject
public class RoundRobinConnectionPool extends MultiplexConnectionPool public class RoundRobinConnectionPool extends IndexedConnectionPool
{ {
private static final Logger LOG = LoggerFactory.getLogger(RoundRobinConnectionPool.class); private final AtomicInteger offset = new AtomicInteger();
private final AutoLock lock = new AutoLock();
private final Pool<Connection> pool;
private int offset;
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester) public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester)
{ {
@ -42,37 +59,17 @@ public class RoundRobinConnectionPool extends MultiplexConnectionPool
public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex) public RoundRobinConnectionPool(HttpDestination destination, int maxConnections, Callback requester, int maxMultiplex)
{ {
super(destination, maxConnections, false, requester, maxMultiplex); super(destination, maxConnections, requester, maxMultiplex);
pool = destination.getBean(Pool.class);
}
@Override
public Connection acquire(boolean create)
{
// If there are queued requests and connections get // If there are queued requests and connections get
// closed due to idle timeout or overuse, we want to // closed due to idle timeout or overuse, we want to
// aggressively try to open new connections to replace // aggressively try to open new connections to replace
// those that were closed to process queued requests. // those that were closed to process queued requests.
return super.acquire(true); setMaximizeConnections(true);
} }
@Override @Override
protected Connection activate() protected int getIndex(int maxConnections)
{ {
Pool<Connection>.Entry entry; return offset.getAndUpdate(v -> ++v == maxConnections ? 0 : v);
try (AutoLock l = lock.lock())
{
int index = Math.abs(offset % pool.getMaxEntries());
entry = pool.acquireAt(index);
if (LOG.isDebugEnabled())
LOG.debug("activated at index={} entry={}", index, entry);
if (entry != null)
++offset;
}
if (entry == null)
return null;
Connection connection = entry.getPooled();
acquired(connection);
return connection;
} }
} }

View File

@ -73,7 +73,8 @@ public class ConnectionPoolTest
{ {
return Stream.of( return Stream.of(
new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)), new ConnectionPoolFactory("duplex", destination -> new DuplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination)),
new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)) new ConnectionPoolFactory("multiplex", destination -> new MultiplexConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1)),
new ConnectionPoolFactory("random", destination -> new RandomConnectionPool(destination, destination.getHttpClient().getMaxConnectionsPerDestination(), destination, 1))
); );
} }

View File

@ -21,6 +21,7 @@ package org.eclipse.jetty.http.client;
import java.io.IOException; import java.io.IOException;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.CyclicBarrier; import java.util.concurrent.CyclicBarrier;
@ -42,6 +43,7 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource; import org.junit.jupiter.params.provider.ArgumentsSource;
import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue; import static org.junit.jupiter.api.Assertions.assertTrue;
@ -71,17 +73,21 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
}); });
int maxConnections = 3; int maxConnections = 3;
scenario.client.getTransport().setConnectionPoolFactory(destination -> new RoundRobinConnectionPool(destination, maxConnections, destination)); CompletableFuture<Void> setup = new CompletableFuture<>();
scenario.client.getTransport().setConnectionPoolFactory(destination ->
// Prime the connections, so that they are all opened
// before we actually test the round robin behavior.
for (int i = 0; i < maxConnections; ++i)
{ {
ContentResponse response = scenario.client.newRequest(scenario.newURI()) RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination);
.timeout(5, TimeUnit.SECONDS) pool.preCreateConnections(maxConnections).handle((r, x) -> x != null ? setup.completeExceptionally(x) : setup.complete(null));
.send(); return pool;
assertEquals(HttpStatus.OK_200, response.getStatus()); });
}
// Send one request to trigger destination creation
// and connection pool pre-creation of connections,
// so we can test reliably the round-robin behavior.
scenario.client.newRequest(scenario.newURI())
.timeout(5, TimeUnit.SECONDS)
.send();
setup.get(5, TimeUnit.SECONDS);
record.set(true); record.set(true);
int requests = 2 * maxConnections - 1; int requests = 2 * maxConnections - 1;
@ -118,6 +124,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
int maxConnections = 3; int maxConnections = 3;
int count = maxConnections * maxMultiplex; int count = maxConnections * maxMultiplex;
AtomicBoolean record = new AtomicBoolean();
List<Integer> remotePorts = new CopyOnWriteArrayList<>(); List<Integer> remotePorts = new CopyOnWriteArrayList<>();
AtomicReference<CountDownLatch> requestLatch = new AtomicReference<>(); AtomicReference<CountDownLatch> requestLatch = new AtomicReference<>();
CountDownLatch serverLatch = new CountDownLatch(count); CountDownLatch serverLatch = new CountDownLatch(count);
@ -129,10 +136,13 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
{ {
try try
{ {
remotePorts.add(request.getRemotePort()); if (record.get())
requestLatch.get().countDown(); {
serverLatch.countDown(); remotePorts.add(request.getRemotePort());
barrier.await(); requestLatch.get().countDown();
serverLatch.countDown();
barrier.await();
}
} }
catch (Exception x) catch (Exception x)
{ {
@ -141,11 +151,23 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
} }
}); });
scenario.client.getTransport().setConnectionPoolFactory(destination -> new RoundRobinConnectionPool(destination, maxConnections, destination, maxMultiplex)); CompletableFuture<Void> setup = new CompletableFuture<>();
scenario.client.getTransport().setConnectionPoolFactory(destination ->
{
RoundRobinConnectionPool pool = new RoundRobinConnectionPool(destination, maxConnections, destination);
pool.preCreateConnections(maxConnections).handle((r, x) -> x != null ? setup.completeExceptionally(x) : setup.complete(null));
return pool;
});
// Do not prime the connections, to see if the behavior is // Send one request to trigger destination creation
// correct even if the connections are not pre-created. // and connection pool pre-creation of connections,
// so we can test reliably the round-robin behavior.
scenario.client.newRequest(scenario.newURI())
.timeout(5, TimeUnit.SECONDS)
.send();
setup.get(5, TimeUnit.SECONDS);
record.set(true);
CountDownLatch clientLatch = new CountDownLatch(count); CountDownLatch clientLatch = new CountDownLatch(count);
AtomicInteger requests = new AtomicInteger(); AtomicInteger requests = new AtomicInteger();
for (int i = 0; i < count; ++i) for (int i = 0; i < count; ++i)
@ -171,7 +193,7 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
barrier.await(); barrier.await();
assertTrue(clientLatch.await(5, TimeUnit.SECONDS)); assertTrue(clientLatch.await(5, TimeUnit.SECONDS));
assertThat(remotePorts.size(), Matchers.equalTo(count)); assertThat(remotePorts.toString(), remotePorts.size(), Matchers.equalTo(count));
for (int i = 0; i < count; ++i) for (int i = 0; i < count; ++i)
{ {
int base = i % maxConnections; int base = i % maxConnections;
@ -194,9 +216,9 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
multiplex = 2; multiplex = 2;
int maxMultiplex = multiplex; int maxMultiplex = multiplex;
int maxUsage = 2; int maxUsage = 3;
int maxConnections = 2; int maxConnections = 4;
int count = maxConnections * maxMultiplex * maxUsage; int count = 2 * maxConnections * maxMultiplex * maxUsage;
List<Integer> remotePorts = new CopyOnWriteArrayList<>(); List<Integer> remotePorts = new CopyOnWriteArrayList<>();
scenario.start(new EmptyServerHandler() scenario.start(new EmptyServerHandler()
@ -229,9 +251,14 @@ public class RoundRobinConnectionPoolTest extends AbstractTest<TransportScenario
assertTrue(clientLatch.await(count, TimeUnit.SECONDS)); assertTrue(clientLatch.await(count, TimeUnit.SECONDS));
assertEquals(count, remotePorts.size()); assertEquals(count, remotePorts.size());
// Maps {remote_port -> number_of_times_port_was_used}.
Map<Integer, Long> results = remotePorts.stream() Map<Integer, Long> results = remotePorts.stream()
.collect(Collectors.groupingBy(Function.identity(), Collectors.counting())); .collect(Collectors.groupingBy(Function.identity(), Collectors.counting()));
assertEquals(count / maxUsage, results.size(), remotePorts.toString()); // RoundRobinConnectionPool may open more connections than expected.
assertEquals(1, results.values().stream().distinct().count(), remotePorts.toString()); // For example with maxUsage=2, requests could be sent to these ports:
// [p1, p2, p3 | p1, p2, p3 | p4, p4, p5 | p6, p5, p7]
// Opening p5 and p6 was delayed, so the opening of p7 was triggered
// to replace p4 while p5 and p6 were busy sending their requests.
assertThat(remotePorts.toString(), count / maxUsage, lessThanOrEqualTo(results.size()));
} }
} }