Improve AIMDBackoffManager with atomic references, thread-safety, and parameter checks

In this commit, we have made several improvements to the AIMDBackoffManager class:

* Replaced volatile variables with AtomicReference and AtomicInteger to ensure better thread-safety and atomic operations.
* Updated the @Contract annotation to reflect the new thread-safe behavior of the class.
 * Added parameter validation checks to public methods for better error handling and prevention of incorrect usage.
* Improved the code readability and organization by separating sections of the class with blank lines and consistent indentation.
* These enhancements make the AIMDBackoffManager class more robust, reliable, and easier to maintain.
This commit is contained in:
Arturo Bernal 2023-04-24 21:44:24 +02:00 committed by Oleg Kalnichevski
parent 1bd7f07d17
commit b565f8fab7
3 changed files with 192 additions and 91 deletions

View File

@ -26,15 +26,21 @@
*/
package org.apache.hc.client5.http.impl.classic;
import java.util.HashMap;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.classic.BackoffManager;
import org.apache.hc.core5.annotation.Experimental;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.pool.ConnPoolControl;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>The {@code AIMDBackoffManager} applies an additive increase,
@ -56,79 +62,134 @@ import org.apache.hc.core5.util.TimeValue;
*
* @since 4.2
*/
@Experimental
@Contract(threading = ThreadingBehavior.SAFE)
public class AIMDBackoffManager implements BackoffManager {
private final ConnPoolControl<HttpRoute> connPerRoute;
private final Clock clock;
private final Map<HttpRoute, Long> lastRouteProbes;
private final Map<HttpRoute, Long> lastRouteBackoffs;
private TimeValue coolDown = TimeValue.ofSeconds(5L);
private double backoffFactor = 0.5;
private int cap = 2; // Per RFC 2616 sec 8.1.4
private static final Logger LOG = LoggerFactory.getLogger(AIMDBackoffManager.class);
/**
* Creates an {@code AIMDBackoffManager} to manage
* per-host connection pool sizes represented by the
* given {@link ConnPoolControl}.
* @param connPerRoute per-host routing maximums to
* be managed
* Represents the connection pool control object for managing
* the maximum number of connections allowed per HTTP route.
*/
private final ConnPoolControl<HttpRoute> connPerRoute;
/**
* A map that keeps track of the last time a successful
* connection was made for each HTTP route. Used for
* adjusting the pool size when probing.
*/
private final Map<HttpRoute, Instant> lastRouteProbes;
/**
* A map that keeps track of the last time a connection
* failure occurred for each HTTP route. Used for
* adjusting the pool size when backing off.
*/
private final Map<HttpRoute, Instant> lastRouteBackoffs;
/**
* The cool-down time between adjustments in pool sizes for a given host.
* This time value allows enough time for the adjustments to take effect.
* Defaults to 5 seconds.
*/
private final AtomicReference<TimeValue> coolDown = new AtomicReference<>(TimeValue.ofSeconds(5L));
/**
* The factor to use when backing off; the new per-host limit will be
* roughly the current max times this factor. {@code Math.floor} is
* applied in the case of non-integer outcomes to ensure we actually
* decrease the pool size. Pool sizes are never decreased below 1, however.
* Defaults to 0.5.
*/
private final AtomicReference<Double> backoffFactor = new AtomicReference<>(0.5);
/**
* The absolute maximum per-host connection pool size to probe up to.
* Defaults to 2 (the default per-host max as per RFC 2616 section 8.1.4).
*/
private final AtomicInteger cap = new AtomicInteger(2); // Per RFC 2616 sec 8.1.4
/**
* Constructs an {@code AIMDBackoffManager} with the specified
* {@link ConnPoolControl} and {@link Clock}.
* <p>
* This constructor is primarily used for testing purposes, allowing the
* injection of a custom {@link Clock} implementation.
*
* @param connPerRoute the {@link ConnPoolControl} that manages
* per-host routing maximums
*/
public AIMDBackoffManager(final ConnPoolControl<HttpRoute> connPerRoute) {
this(connPerRoute, new SystemClock());
}
AIMDBackoffManager(final ConnPoolControl<HttpRoute> connPerRoute, final Clock clock) {
this.clock = clock;
this.connPerRoute = connPerRoute;
this.lastRouteProbes = new HashMap<>();
this.lastRouteBackoffs = new HashMap<>();
this.connPerRoute = Args.notNull(connPerRoute, "Connection pool control");
this.lastRouteProbes = new ConcurrentHashMap<>();
this.lastRouteBackoffs = new ConcurrentHashMap<>();
}
@Override
public void backOff(final HttpRoute route) {
synchronized(connPerRoute) {
final int curr = connPerRoute.getMaxPerRoute(route);
final Long lastUpdate = getLastUpdate(lastRouteBackoffs, route);
final long now = clock.getCurrentTime();
if (now - lastUpdate < coolDown.toMilliseconds()) {
return;
final int curr = connPerRoute.getMaxPerRoute(route);
final Instant now = Instant.now();
lastRouteBackoffs.compute(route, (r, lastUpdate) -> {
if (lastUpdate == null || now.isAfter(lastUpdate.plus(coolDown.get().toMilliseconds(), ChronoUnit.MILLIS))) {
if (LOG.isDebugEnabled()) {
LOG.debug("Backoff applied for route: {}, new max connections: {}", route, connPerRoute.getMaxPerRoute(route));
}
connPerRoute.setMaxPerRoute(route, getBackedOffPoolSize(curr));
return now;
}
connPerRoute.setMaxPerRoute(route, getBackedOffPoolSize(curr));
lastRouteBackoffs.put(route, now);
}
return lastUpdate;
});
}
/**
* Returns the backed-off pool size based on the current pool size.
* The new pool size is calculated as the floor of (backoffFactor * curr).
*
* @param curr the current pool size
* @return the backed-off pool size, with a minimum value of 1
*/
private int getBackedOffPoolSize(final int curr) {
if (curr <= 1) {
return 1;
}
return (int)(Math.floor(backoffFactor * curr));
return (int) (Math.floor(backoffFactor.get() * curr));
}
@Override
public void probe(final HttpRoute route) {
synchronized(connPerRoute) {
final int curr = connPerRoute.getMaxPerRoute(route);
final int max = (curr >= cap) ? cap : curr + 1;
final Long lastProbe = getLastUpdate(lastRouteProbes, route);
final Long lastBackoff = getLastUpdate(lastRouteBackoffs, route);
final long now = clock.getCurrentTime();
if (now - lastProbe < coolDown.toMilliseconds()
|| now - lastBackoff < coolDown.toMilliseconds()) {
return;
final int curr = connPerRoute.getMaxPerRoute(route);
final int max = (curr >= cap.get()) ? cap.get() : curr + 1;
final Instant now = Instant.now();
lastRouteProbes.compute(route, (r, lastProbe) -> {
if (lastProbe == null || now.isAfter(lastProbe.plus(coolDown.get().toMilliseconds(), ChronoUnit.MILLIS))) {
final Instant lastBackoff = lastRouteBackoffs.get(r);
if (lastBackoff == null || now.isAfter(lastBackoff.plus(coolDown.get().toMilliseconds(), ChronoUnit.MILLIS))) {
connPerRoute.setMaxPerRoute(route, max);
if (LOG.isDebugEnabled()){
LOG.info("Probe applied for route: {}, new max connections: {}", route, connPerRoute.getMaxPerRoute(route));
}
return now;
}
}
connPerRoute.setMaxPerRoute(route, max);
lastRouteProbes.put(route, now);
}
return lastProbe;
});
}
private Long getLastUpdate(final Map<HttpRoute, Long> updates, final HttpRoute route) {
Long lastUpdate = updates.get(route);
if (lastUpdate == null) {
lastUpdate = 0L;
}
return lastUpdate;
/**
* Returns the last update time of a specific route from the provided updates map.
*
* @param updates the map containing the last update times
* @param route the HttpRoute whose last update time is required
* @return the last update time or 0L if the route is not present in the map
*/
private long getLastUpdate(final Map<HttpRoute, Long> updates, final HttpRoute route) {
return updates.getOrDefault(route, 0L);
}
/**
@ -142,7 +203,7 @@ public class AIMDBackoffManager implements BackoffManager {
*/
public void setBackoffFactor(final double d) {
Args.check(d > 0.0 && d < 1.0, "Backoff factor must be 0.0 < f < 1.0");
backoffFactor = d;
backoffFactor.set(d);
}
/**
@ -152,8 +213,9 @@ public class AIMDBackoffManager implements BackoffManager {
* @param coolDown must be positive
*/
public void setCoolDown(final TimeValue coolDown) {
Args.notNull(coolDown, "Cool down time value cannot be null");
Args.positive(coolDown.getDuration(), "coolDown");
this.coolDown = coolDown;
this.coolDown.set(coolDown);
}
/**
@ -163,7 +225,6 @@ public class AIMDBackoffManager implements BackoffManager {
*/
public void setPerHostConnectionCap(final int cap) {
Args.positive(cap, "Per host connection cap");
this.cap = cap;
this.cap.set(cap);
}
}

View File

@ -35,12 +35,13 @@ import org.apache.hc.client5.http.classic.ConnectionBackoffStrategy;
import org.apache.hc.client5.http.classic.ExecChain;
import org.apache.hc.client5.http.classic.ExecChainHandler;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.Experimental;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.http.ClassicHttpRequest;
import org.apache.hc.core5.http.ClassicHttpResponse;
import org.apache.hc.core5.http.HttpException;
import org.apache.hc.core5.util.Args;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Request execution handler in the classic request execution chain
@ -54,12 +55,22 @@ import org.apache.hc.core5.util.Args;
* @since 4.3
*/
@Contract(threading = ThreadingBehavior.STATELESS)
@Experimental
public final class BackoffStrategyExec implements ExecChainHandler {
private static final Logger LOG = LoggerFactory.getLogger(BackoffStrategyExec.class);
private final ConnectionBackoffStrategy connectionBackoffStrategy;
private final BackoffManager backoffManager;
/**
* Constructs a {@code BackoffStrategyExec} with the specified
* {@link ConnectionBackoffStrategy} and {@link BackoffManager}.
*
* @param connectionBackoffStrategy the strategy to determine whether
* to backoff based on the response or exception
* @param backoffManager the manager responsible for applying backoff
* and probing actions to the HTTP routes
*/
public BackoffStrategyExec(
final ConnectionBackoffStrategy connectionBackoffStrategy,
final BackoffManager backoffManager) {
@ -84,13 +95,22 @@ public final class BackoffStrategyExec implements ExecChainHandler {
response = chain.proceed(request, scope);
} catch (final IOException | HttpException ex) {
if (this.connectionBackoffStrategy.shouldBackoff(ex)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Backing off route {} due to exception: {}", route, ex.getMessage());
}
this.backoffManager.backOff(route);
}
throw ex;
}
if (this.connectionBackoffStrategy.shouldBackoff(response)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Backing off route {} due to response status: {}", route, response.getCode());
}
this.backoffManager.backOff(route);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Probing route: {}", route);
}
this.backoffManager.probe(route);
}
return response;

View File

@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.Random;
import java.util.concurrent.CountDownLatch;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.classic.BackoffManager;
@ -44,15 +45,17 @@ public class TestAIMDBackoffManager {
private AIMDBackoffManager impl;
private MockConnPoolControl connPerRoute;
private HttpRoute route;
private MockClock clock;
private static final long DEFAULT_COOL_DOWN_MS = 5000; // Adjust this value to match the default cooldown period in AIMDBackoffManager
@BeforeEach
public void setUp() {
connPerRoute = new MockConnPoolControl();
route = new HttpRoute(new HttpHost("localhost", 80));
clock = new MockClock();
impl = new AIMDBackoffManager(connPerRoute, clock);
impl = new AIMDBackoffManager(connPerRoute);
impl.setPerHostConnectionCap(10);
impl.setCoolDown(TimeValue.ofMilliseconds(DEFAULT_COOL_DOWN_MS));
}
@Test
@ -91,49 +94,41 @@ public class TestAIMDBackoffManager {
}
@Test
public void backoffDoesNotAdjustDuringCoolDownPeriod() {
public void backoffDoesNotAdjustDuringCoolDownPeriod() throws InterruptedException {
connPerRoute.setMaxPerRoute(route, 4);
final long now = System.currentTimeMillis();
clock.setCurrentTime(now);
impl.backOff(route);
final long max = connPerRoute.getMaxPerRoute(route);
clock.setCurrentTime(now + 1);
Thread.sleep(1); // Sleep for 1 ms
impl.backOff(route);
assertEquals(max, connPerRoute.getMaxPerRoute(route));
}
@Test
public void backoffStillAdjustsAfterCoolDownPeriod() {
public void backoffStillAdjustsAfterCoolDownPeriod() throws InterruptedException {
connPerRoute.setMaxPerRoute(route, 8);
final long now = System.currentTimeMillis();
clock.setCurrentTime(now);
impl.backOff(route);
final long max = connPerRoute.getMaxPerRoute(route);
clock.setCurrentTime(now + 10 * 1000L);
Thread.sleep(DEFAULT_COOL_DOWN_MS + 1); // Sleep for cooldown period + 1 ms
impl.backOff(route);
assertTrue(max == 1 || max > connPerRoute.getMaxPerRoute(route));
}
@Test
public void probeDoesNotAdjustDuringCooldownPeriod() {
public void probeDoesNotAdjustDuringCooldownPeriod() throws InterruptedException {
connPerRoute.setMaxPerRoute(route, 4);
final long now = System.currentTimeMillis();
clock.setCurrentTime(now);
impl.probe(route);
final long max = connPerRoute.getMaxPerRoute(route);
clock.setCurrentTime(now + 1);
Thread.sleep(1); // Sleep for 1 ms
impl.probe(route);
assertEquals(max, connPerRoute.getMaxPerRoute(route));
}
@Test
public void probeStillAdjustsAfterCoolDownPeriod() {
public void probeStillAdjustsAfterCoolDownPeriod() throws InterruptedException {
connPerRoute.setMaxPerRoute(route, 8);
final long now = System.currentTimeMillis();
clock.setCurrentTime(now);
impl.probe(route);
final long max = connPerRoute.getMaxPerRoute(route);
clock.setCurrentTime(now + 10 * 1000L);
Thread.sleep(DEFAULT_COOL_DOWN_MS + 1); // Sleep for cooldown period + 1 ms
impl.probe(route);
assertTrue(max < connPerRoute.getMaxPerRoute(route));
}
@ -142,10 +137,8 @@ public class TestAIMDBackoffManager {
public void willBackoffImmediatelyEvenAfterAProbe() {
connPerRoute.setMaxPerRoute(route, 8);
final long now = System.currentTimeMillis();
clock.setCurrentTime(now);
impl.probe(route);
final long max = connPerRoute.getMaxPerRoute(route);
clock.setCurrentTime(now + 1);
impl.backOff(route);
assertTrue(connPerRoute.getMaxPerRoute(route) < max);
}
@ -159,24 +152,51 @@ public class TestAIMDBackoffManager {
}
@Test
public void coolDownPeriodIsConfigurable() {
long cd = new Random().nextLong() / 2;
if (cd < 0) {
cd *= -1;
}
if (cd < 1) {
cd++;
}
final long now = System.currentTimeMillis();
public void coolDownPeriodIsConfigurable() throws InterruptedException {
final long cd = new Random().nextInt(500) + 500; // Random cooldown period between 500 and 1000 milliseconds
impl.setCoolDown(TimeValue.ofMilliseconds(cd));
clock.setCurrentTime(now);
// Probe and check if the connection count remains the same during the cooldown period
impl.probe(route);
final int max0 = connPerRoute.getMaxPerRoute(route);
clock.setCurrentTime(now);
Thread.sleep(cd / 2); // Sleep for half the cooldown period
impl.probe(route);
assertEquals(max0, connPerRoute.getMaxPerRoute(route));
clock.setCurrentTime(now + cd + 1);
// Probe and check if the connection count increases after the cooldown period
Thread.sleep(cd / 2 + 1); // Sleep for the remaining half of the cooldown period + 1 ms
impl.probe(route);
assertTrue(max0 < connPerRoute.getMaxPerRoute(route));
}
@Test
public void testConcurrency() throws InterruptedException {
final int initialMaxPerRoute = 10;
final int numberOfThreads = 20;
final int numberOfOperationsPerThread = 100;
connPerRoute.setMaxPerRoute(route, initialMaxPerRoute);
final CountDownLatch latch = new CountDownLatch(numberOfThreads);
final Runnable backoffAndProbeTask = () -> {
for (int i = 0; i < numberOfOperationsPerThread; i++) {
if (Math.random() < 0.5) {
impl.backOff(route);
} else {
impl.probe(route);
}
}
latch.countDown();
};
for (int i = 0; i < numberOfThreads; i++) {
new Thread(backoffAndProbeTask).start();
}
latch.await();
final int finalMaxPerRoute = connPerRoute.getMaxPerRoute(route);
// The final value should be within an acceptable range (e.g., 5 to 15) since the number of backOff and probe operations should balance out over time
assertTrue(finalMaxPerRoute >= initialMaxPerRoute - 5 && finalMaxPerRoute <= initialMaxPerRoute + 5);
}
}