Add AbstractBackoff, LinearBackoffManager, and ExponentialBackoffManager classes

- Create AbstractBackoff class as a base for different backoff strategies
- Add LinearBackoffManager class for implementing linear backoff algorithm in networking and communication systems. This class provides a thread-safe and configurable implementation of linear backoff, allowing for gradual adjustment of maximum connection pool sizes for a given route based on traffic and other factors. The class supports customizable cool-down periods and increment values for each route, and uses a ConcurrentHashMap to ensure thread safety and accurate tracking of backoff and probe attempts
- Implement ExponentialBackoffManager for connection pool control.
This commit introduces the ExponentialBackoffManager class, which manages the connection pool control for HTTP routes based on the Exponential Backoff algorithm. This implementation helps improve connection handling and stability in case of connection failures or network issues.
This commit is contained in:
Arturo Bernal 2023-04-29 22:34:26 +02:00 committed by Oleg Kalnichevski
parent b565f8fab7
commit f203dcd34e
6 changed files with 919 additions and 134 deletions

View File

@ -26,21 +26,11 @@
*/
package org.apache.hc.client5.http.impl.classic;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.classic.BackoffManager;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.pool.ConnPoolControl;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* <p>The {@code AIMDBackoffManager} applies an additive increase,
@ -63,53 +53,7 @@ import org.slf4j.LoggerFactory;
* @since 4.2
*/
@Contract(threading = ThreadingBehavior.SAFE)
public class AIMDBackoffManager implements BackoffManager {
private static final Logger LOG = LoggerFactory.getLogger(AIMDBackoffManager.class);
/**
* Represents the connection pool control object for managing
* the maximum number of connections allowed per HTTP route.
*/
private final ConnPoolControl<HttpRoute> connPerRoute;
/**
* A map that keeps track of the last time a successful
* connection was made for each HTTP route. Used for
* adjusting the pool size when probing.
*/
private final Map<HttpRoute, Instant> lastRouteProbes;
/**
* A map that keeps track of the last time a connection
* failure occurred for each HTTP route. Used for
* adjusting the pool size when backing off.
*/
private final Map<HttpRoute, Instant> lastRouteBackoffs;
/**
* The cool-down time between adjustments in pool sizes for a given host.
* This time value allows enough time for the adjustments to take effect.
* Defaults to 5 seconds.
*/
private final AtomicReference<TimeValue> coolDown = new AtomicReference<>(TimeValue.ofSeconds(5L));
/**
* The factor to use when backing off; the new per-host limit will be
* roughly the current max times this factor. {@code Math.floor} is
* applied in the case of non-integer outcomes to ensure we actually
* decrease the pool size. Pool sizes are never decreased below 1, however.
* Defaults to 0.5.
*/
private final AtomicReference<Double> backoffFactor = new AtomicReference<>(0.5);
/**
* The absolute maximum per-host connection pool size to probe up to.
* Defaults to 2 (the default per-host max as per RFC 2616 section 8.1.4).
*/
private final AtomicInteger cap = new AtomicInteger(2); // Per RFC 2616 sec 8.1.4
public class AIMDBackoffManager extends AbstractBackoff {
/**
* Constructs an {@code AIMDBackoffManager} with the specified
@ -122,29 +66,9 @@ public class AIMDBackoffManager implements BackoffManager {
* per-host routing maximums
*/
public AIMDBackoffManager(final ConnPoolControl<HttpRoute> connPerRoute) {
this.connPerRoute = Args.notNull(connPerRoute, "Connection pool control");
this.lastRouteProbes = new ConcurrentHashMap<>();
this.lastRouteBackoffs = new ConcurrentHashMap<>();
super(connPerRoute);
}
@Override
public void backOff(final HttpRoute route) {
final int curr = connPerRoute.getMaxPerRoute(route);
final Instant now = Instant.now();
lastRouteBackoffs.compute(route, (r, lastUpdate) -> {
if (lastUpdate == null || now.isAfter(lastUpdate.plus(coolDown.get().toMilliseconds(), ChronoUnit.MILLIS))) {
if (LOG.isDebugEnabled()) {
LOG.debug("Backoff applied for route: {}, new max connections: {}", route, connPerRoute.getMaxPerRoute(route));
}
connPerRoute.setMaxPerRoute(route, getBackedOffPoolSize(curr));
return now;
}
return lastUpdate;
});
}
/**
* Returns the backed-off pool size based on the current pool size.
* The new pool size is calculated as the floor of (backoffFactor * curr).
@ -152,46 +76,14 @@ public class AIMDBackoffManager implements BackoffManager {
* @param curr the current pool size
* @return the backed-off pool size, with a minimum value of 1
*/
private int getBackedOffPoolSize(final int curr) {
protected int getBackedOffPoolSize(final int curr) {
if (curr <= 1) {
return 1;
}
return (int) (Math.floor(backoffFactor.get() * curr));
return (int) (Math.floor(getBackoffFactor().get() * curr));
}
@Override
public void probe(final HttpRoute route) {
final int curr = connPerRoute.getMaxPerRoute(route);
final int max = (curr >= cap.get()) ? cap.get() : curr + 1;
final Instant now = Instant.now();
lastRouteProbes.compute(route, (r, lastProbe) -> {
if (lastProbe == null || now.isAfter(lastProbe.plus(coolDown.get().toMilliseconds(), ChronoUnit.MILLIS))) {
final Instant lastBackoff = lastRouteBackoffs.get(r);
if (lastBackoff == null || now.isAfter(lastBackoff.plus(coolDown.get().toMilliseconds(), ChronoUnit.MILLIS))) {
connPerRoute.setMaxPerRoute(route, max);
if (LOG.isDebugEnabled()){
LOG.info("Probe applied for route: {}, new max connections: {}", route, connPerRoute.getMaxPerRoute(route));
}
return now;
}
}
return lastProbe;
});
}
/**
* Returns the last update time of a specific route from the provided updates map.
*
* @param updates the map containing the last update times
* @param route the HttpRoute whose last update time is required
* @return the last update time or 0L if the route is not present in the map
*/
private long getLastUpdate(final Map<HttpRoute, Long> updates, final HttpRoute route) {
return updates.getOrDefault(route, 0L);
}
/**
* Sets the factor to use when backing off; the new
* per-host limit will be roughly the current max times
@ -201,30 +93,10 @@ public class AIMDBackoffManager implements BackoffManager {
* below 1, however. Defaults to 0.5.
* @param d must be between 0.0 and 1.0, exclusive.
*/
@Override
public void setBackoffFactor(final double d) {
Args.check(d > 0.0 && d < 1.0, "Backoff factor must be 0.0 < f < 1.0");
backoffFactor.set(d);
getBackoffFactor().set(d);
}
/**
* Sets the amount of time to wait between adjustments in
* pool sizes for a given host, to allow enough time for
* the adjustments to take effect. Defaults to 5 seconds.
* @param coolDown must be positive
*/
public void setCoolDown(final TimeValue coolDown) {
Args.notNull(coolDown, "Cool down time value cannot be null");
Args.positive(coolDown.getDuration(), "coolDown");
this.coolDown.set(coolDown);
}
/**
* Sets the absolute maximum per-host connection pool size to
* probe up to; defaults to 2 (the default per-host max).
* @param cap must be &gt;= 1
*/
public void setPerHostConnectionCap(final int cap) {
Args.positive(cap, "Per host connection cap");
this.cap.set(cap);
}
}

View File

@ -0,0 +1,281 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.classic;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.classic.BackoffManager;
import org.apache.hc.core5.pool.ConnPoolControl;
import org.apache.hc.core5.util.Args;
import org.apache.hc.core5.util.TimeValue;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;
/**
* AbstractBackoff is an abstract class that provides a common implementation for managing
* backoff behavior in HttpClient connection pool. Subclasses should implement the specific
* backoff algorithms by overriding the abstract methods.
* <p>
* This class provides common functionality for maintaining the route-wise backoff and probe
* timestamps, as well as the cool-down period for each backoff attempt.
* <p>
* It also contains the basic structure of the backOff and probe methods, which use the route-wise
* timestamps to determine if the backoff or probe should be applied, and then call the specific
* algorithm implementation for calculating the new pool size.
*
* @since 5.3
*/
public abstract class AbstractBackoff implements BackoffManager {
private static final Logger LOG = LoggerFactory.getLogger(AbstractBackoff.class);
/**
* Connection pool control responsible for managing the maximum number of connections per HTTP route.
*/
private final ConnPoolControl<HttpRoute> connPerRoute;
/**
* A map that stores the last probe timestamp for each HTTP route.
*/
private final Map<HttpRoute, Instant> lastRouteProbes;
/**
* A map that stores the last backoff timestamp for each HTTP route.
*/
private final Map<HttpRoute, Instant> lastRouteBackoffs;
/**
* The cool-down period after which the backoff or probe process can be performed again.
*/
private final AtomicReference<TimeValue> coolDown = new AtomicReference<>(TimeValue.ofSeconds(5L));
/**
* The growth rate used in the exponential backoff algorithm.
*/
private final AtomicReference<Double> backoffFactor = new AtomicReference<>(0.5);
/**
* The per-host connection cap, as defined in RFC 2616 sec 8.1.4.
*/
private final AtomicInteger cap = new AtomicInteger(2);
/**
* The number of time intervals used in the exponential backoff algorithm.
*/
private final AtomicInteger timeInterval = new AtomicInteger(0);
/**
* Constructs a new ExponentialBackoffManager with the specified connection pool control.
*
* @param connPerRoute the connection pool control to be used for managing connections
* @throws IllegalArgumentException if connPerRoute is null
*/
public AbstractBackoff(final ConnPoolControl<HttpRoute> connPerRoute) {
this.connPerRoute = Args.notNull(connPerRoute, "Connection pool control");
this.lastRouteProbes = new ConcurrentHashMap<>();
this.lastRouteBackoffs = new ConcurrentHashMap<>();
}
/**
* Reduces the number of maximum allowed connections for the specified route based on the exponential backoff algorithm.
*
* @param route the HttpRoute for which the backoff needs to be applied
*/
@Override
public void backOff(final HttpRoute route) {
final int curr = connPerRoute.getMaxPerRoute(route);
final Instant now = Instant.now();
lastRouteBackoffs.compute(route, (r, lastUpdate) -> {
if (lastUpdate == null || now.isAfter(lastUpdate.plus(coolDown.get().toMilliseconds(), ChronoUnit.MILLIS))) {
final int backedOffPoolSize = getBackedOffPoolSize(curr); // Exponential backoff
connPerRoute.setMaxPerRoute(route, backedOffPoolSize);
if (LOG.isDebugEnabled()) {
LOG.debug("Backoff applied for route: {}, new max connections: {}", route, connPerRoute.getMaxPerRoute(route));
}
return now;
}
return lastUpdate;
});
}
/**
* Calculates the new pool size after applying the exponential backoff algorithm.
* The new pool size is calculated using the formula: floor(curr / (1 + growthRate) ^ t),
* where curr is the current pool size, growthRate is the exponential growth rate, and t is the time interval.
*
* @param curr the current pool size
* @return the new pool size after applying the backoff
*/
protected abstract int getBackedOffPoolSize(int curr);
/**
* Increases the number of maximum allowed connections for the specified route after a successful connection has been established.
*
* @param route the HttpRoute for which the probe needs to be applied
*/
@Override
public void probe(final HttpRoute route) {
final int curr = connPerRoute.getMaxPerRoute(route);
final int max = (curr >= cap.get()) ? cap.get() : curr + 1;
final Instant now = Instant.now();
lastRouteProbes.compute(route, (r, lastProbe) -> {
if (lastProbe == null || now.isAfter(lastProbe.plus(coolDown.get().toMilliseconds(), ChronoUnit.MILLIS))) {
final Instant lastBackoff = lastRouteBackoffs.get(r);
if (lastBackoff == null || now.isAfter(lastBackoff.plus(coolDown.get().toMilliseconds(), ChronoUnit.MILLIS))) {
connPerRoute.setMaxPerRoute(route, max);
if (LOG.isDebugEnabled()) {
LOG.info("Probe applied for route: {}, new max connections: {}", route, connPerRoute.getMaxPerRoute(route));
}
timeInterval.set(0); // Reset the time interval
return now;
}
}
return lastProbe;
});
}
/**
* Retrieves the last update timestamp for the specified route from the provided updates map.
*
* @param updates the map containing update timestamps for HttpRoutes
* @param route the HttpRoute for which the last update timestamp is needed
* @return the last update timestamp for the specified route or 0L if not present in the map
*/
public long getLastUpdate(final Map<HttpRoute, Long> updates, final HttpRoute route) {
return updates.getOrDefault(route, 0L);
}
/**
* Sets the per-host connection cap.
*
* @param cap the per-host connection cap to be set
* @throws IllegalArgumentException if the cap is not positive
*/
public void setPerHostConnectionCap(final int cap) {
Args.positive(cap, "Per host connection cap");
this.cap.set(cap);
}
/**
* Sets the backoff factor for the backoff algorithm.
* The backoff factor should be a value between 0.0 and 1.0.
* The specific implementation of how the backoff factor is used should be provided by subclasses.
*
* @param d the backoff factor to be set
*/
abstract void setBackoffFactor(final double d);
/**
* Sets the cool-down time value for adjustments in pool sizes for a given host. This time value
* allows enough time for the adjustments to take effect before further adjustments are made.
* The cool-down time value must be positive and not null.
*
* @param coolDown the TimeValue representing the cool-down period between adjustments
* @throws IllegalArgumentException if the provided cool-down time value is null or non-positive
*/
public void setCoolDown(final TimeValue coolDown) {
Args.notNull(coolDown, "Cool down time value cannot be null");
Args.positive(coolDown.getDuration(), "coolDown");
this.coolDown.set(coolDown);
}
/**
* Returns the connection pool control for managing the maximum number of connections per route.
*
* @return the connection pool control instance
*/
protected ConnPoolControl<HttpRoute> getConnPerRoute() {
return connPerRoute;
}
/**
* Returns the map containing the last probe times for each HttpRoute.
*
* @return the map of HttpRoute to Instant representing the last probe times
*/
protected Map<HttpRoute, Instant> getLastRouteProbes() {
return lastRouteProbes;
}
/**
* Returns the map containing the last backoff times for each HttpRoute.
*
* @return the map of HttpRoute to Instant representing the last backoff times
*/
protected Map<HttpRoute, Instant> getLastRouteBackoffs() {
return lastRouteBackoffs;
}
/**
* Returns the cool down period between backoff and probe operations as an AtomicReference of TimeValue.
*
* @return the AtomicReference containing the cool down period
*/
protected AtomicReference<TimeValue> getCoolDown() {
return coolDown;
}
/**
* Returns the backoff factor as an AtomicReference of Double.
*
* @return the AtomicReference containing the backoff factor
*/
protected AtomicReference<Double> getBackoffFactor() {
return backoffFactor;
}
/**
* Returns the cap on the maximum number of connections per route as an AtomicInteger.
*
* @return the AtomicInteger containing the cap value
*/
protected AtomicInteger getCap() {
return cap;
}
/**
* Returns the time interval between backoff and probe operations as an AtomicInteger.
*
* @return the AtomicInteger containing the time interval
*/
protected AtomicInteger getTimeInterval() {
return timeInterval;
}
}

View File

@ -0,0 +1,92 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.classic;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.core5.pool.ConnPoolControl;
import org.apache.hc.core5.util.Args;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A backoff manager implementation that uses an exponential backoff algorithm to adjust the maximum
* number of connections per HTTP route. The algorithm reduces the number of connections in response
* to adverse events, such as connection failures, and gradually increases the number of connections
* when the route is operating without issues.
*
* <p>This implementation is specifically designed for managing connections in an HTTP route context
* and provides methods for probing and backing off connections based on the performance of the route.
*
* <p>The exponential backoff algorithm is primarily implemented in the {@code getBackedOffPoolSize}
* method, which calculates the new connection pool size based on the current pool size, growth rate,
* and the number of time intervals.
*
* @since 5.3
*/
public class ExponentialBackoffManager extends AbstractBackoff {
private static final Logger LOG = LoggerFactory.getLogger(ExponentialBackoffManager.class);
/**
* Constructs a new ExponentialBackoffManager with the specified connection pool control.
*
* @param connPerRoute the connection pool control to be used for managing connections
* @throws IllegalArgumentException if connPerRoute is null
*/
public ExponentialBackoffManager(final ConnPoolControl<HttpRoute> connPerRoute) {
super(connPerRoute);
}
/**
* Calculates the new pool size after applying the exponential backoff algorithm.
* The new pool size is calculated using the formula: floor(curr / (1 + growthRate) ^ t),
* where curr is the current pool size, growthRate is the exponential growth rate, and t is the time interval.
*
* @param curr the current pool size
* @return the new pool size after applying the backoff
*/
protected int getBackedOffPoolSize(final int curr) {
if (curr <= 1) {
return 1;
}
final int t = getTimeInterval().incrementAndGet();
final int result = Math.max(1, (int) Math.floor(curr / Math.pow(1 + getBackoffFactor().get(), t)));
if (LOG.isDebugEnabled()) {
LOG.debug("curr={}, t={}, growthRate={}, result={}", curr, t, getBackoffFactor().get(), result);
}
return result;
}
public void setBackoffFactor(final double rate) {
Args.check(rate > 0.0, "Growth rate must be greater than 0.0");
this.getBackoffFactor().set(rate);
}
}

View File

@ -0,0 +1,221 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.classic;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.client5.http.classic.BackoffManager;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
import org.apache.hc.core5.pool.ConnPoolControl;
import org.apache.hc.core5.util.Args;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Duration;
import java.time.Instant;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
/**
* An implementation of {@link BackoffManager} that uses a linear backoff strategy to adjust the maximum number
* of connections per route in an {@link org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager}.
* This class is designed to be thread-safe and can be used in multi-threaded environments.
* <p>
* The linear backoff strategy increases or decreases the maximum number of connections per route by a fixed increment
* when backing off or probing, respectively. The adjustments are made based on a cool-down period, during which no
* further adjustments will be made.
* <p>
* The {@code LinearBackoffManager} is intended to be used with a {@link org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager},
* which provides the {@link ConnPoolControl} interface. This class interacts with the {@code PoolingHttpClientConnectionManager}
* to adjust the maximum number of connections per route.
* <p>
* Example usage:
* <pre>
* PoolingHttpClientConnectionManager connectionManager = new PoolingHttpClientConnectionManager();
* LinearBackoffManager backoffManager = new LinearBackoffManager(connectionManager, 1);
* // Use the backoffManager with the connectionManager in your application
* </pre>
*
* @see BackoffManager
* @see ConnPoolControl
* @see org.apache.hc.client5.http.impl.io.PoolingHttpClientConnectionManager
* @since 5.3
*/
@Contract(threading = ThreadingBehavior.SAFE)
public class LinearBackoffManager extends AbstractBackoff {
private static final Logger LOG = LoggerFactory.getLogger(org.slf4j.LoggerFactory.class);
/**
* The backoff increment used when adjusting connection pool sizes.
* The pool size will be increased or decreased by this value during the backoff process.
* The increment must be positive.
*/
private final int increment;
private final ConcurrentHashMap<HttpRoute, AtomicInteger> routeAttempts;
/**
* Constructs a new LinearBackoffManager with the specified connection pool control.
* The backoff increment is set to {@code 1} by default.
*
* @param connPoolControl the connection pool control to be used by this LinearBackoffManager
*/
public LinearBackoffManager(final ConnPoolControl<HttpRoute> connPoolControl) {
this(connPoolControl, 1);
}
/**
* Constructs a new LinearBackoffManager with the specified connection pool control and backoff increment.
*
* @param connPoolControl the connection pool control to be used by this LinearBackoffManager
* @param increment the backoff increment to be used when adjusting connection pool sizes
* @throws IllegalArgumentException if connPoolControl is {@code null} or increment is not positive
*/
public LinearBackoffManager(final ConnPoolControl<HttpRoute> connPoolControl, final int increment) {
super(connPoolControl);
this.increment = Args.positive(increment, "Increment");
routeAttempts = new ConcurrentHashMap<>();
}
@Override
public void backOff(final HttpRoute route) {
final Instant now = Instant.now();
if (shouldSkip(route, now)) {
if (LOG.isDebugEnabled()) {
LOG.debug("BackOff not applied for route: {}, cool-down period not elapsed", route);
}
return;
}
final AtomicInteger attempt = routeAttempts.compute(route, (r, oldValue) -> {
if (oldValue == null) {
return new AtomicInteger(1);
}
oldValue.incrementAndGet();
return oldValue;
});
getLastRouteBackoffs().put(route, now);
final int currentMax = getConnPerRoute().getMaxPerRoute(route);
getConnPerRoute().setMaxPerRoute(route, getBackedOffPoolSize(currentMax));
attempt.incrementAndGet();
if (LOG.isDebugEnabled()) {
LOG.debug("Backoff applied for route: {}, new max connections: {}", route, getConnPerRoute().getMaxPerRoute(route));
}
}
/**
* Adjusts the maximum number of connections for the specified route, decreasing it by the increment value.
* The method ensures that adjustments only happen after the cool-down period has passed since the last adjustment.
*
* @param route the HttpRoute for which the maximum number of connections will be decreased
*/
@Override
public void probe(final HttpRoute route) {
final Instant now = Instant.now();
if (shouldSkip(route, now)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Probe not applied for route: {}, cool-down period not elapsed", route);
}
return;
}
routeAttempts.compute(route, (r, oldValue) -> {
if (oldValue == null || oldValue.get() <= 1) {
return null;
}
oldValue.decrementAndGet();
return oldValue;
});
getLastRouteProbes().put(route, now);
final int currentMax = getConnPerRoute().getMaxPerRoute(route);
final int newMax = Math.max(currentMax - increment, getCap().get()); // Ensure the new max does not go below the cap
getConnPerRoute().setMaxPerRoute(route, newMax);
if (LOG.isDebugEnabled()) {
LOG.debug("Probe applied for route: {}, new max connections: {}", route, getConnPerRoute().getMaxPerRoute(route));
}
}
/**
* Determines whether an adjustment action (backoff or probe) should be skipped for the given HttpRoute based on the cool-down period.
* If the time elapsed since the last successful probe or backoff for the given route is less than the cool-down
* period, the method returns true. Otherwise, it returns false.
* <p>
* This method is used by both backOff() and probe() methods to enforce the cool-down period before making adjustments
* to the connection pool size.
*
* @param route the {@link HttpRoute} to check
* @param now the current {@link Instant} used to calculate the time since the last probe or backoff
* @return true if the cool-down period has not elapsed since the last probe or backoff, false otherwise
*/
private boolean shouldSkip(final HttpRoute route, final Instant now) {
final Instant lastProbe = getLastRouteProbes().getOrDefault(route, Instant.EPOCH);
final Instant lastBackoff = getLastRouteBackoffs().getOrDefault(route, Instant.EPOCH);
return Duration.between(lastProbe, now).compareTo(getCoolDown().get().toDuration()) < 0 ||
Duration.between(lastBackoff, now).compareTo(getCoolDown().get().toDuration()) < 0;
}
/**
* Returns the new pool size after applying the linear backoff algorithm.
* The new pool size is calculated by adding the increment value to the current pool size.
*
* @param curr the current pool size
* @return the new pool size after applying the linear backoff
*/
@Override
protected int getBackedOffPoolSize(final int curr) {
return curr + increment;
}
/**
* This method is not used in LinearBackoffManager's implementation.
* It is provided to fulfill the interface requirement and for potential future extensions or modifications
* of LinearBackoffManager that may use the backoff factor.
*
* @param d the backoff factor, not used in the current implementation
*/
@Override
public void setBackoffFactor(final double d) {
// Intentionally empty, as the backoff factor is not used in LinearBackoffManager
}
}

View File

@ -0,0 +1,157 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.classic;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.util.TimeValue;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestExponentialBackoffManager {
private ExponentialBackoffManager impl;
private MockConnPoolControl connPerRoute;
private HttpRoute route;
private static final long DEFAULT_COOL_DOWN_MS = 5000; // Adjust this value to match the default cooldown period in ExponentialBackoffManager
@BeforeEach
public void setUp() {
connPerRoute = new MockConnPoolControl();
route = new HttpRoute(new HttpHost("localhost", 80));
impl = new ExponentialBackoffManager(connPerRoute);
impl.setPerHostConnectionCap(10);
impl.setCoolDown(TimeValue.ofMilliseconds(DEFAULT_COOL_DOWN_MS));
impl.setBackoffFactor(1.75); // Adjust this value to match the default growth rate in ExponentialBackoffManager
}
@Test
public void exponentialBackoffApplied() {
connPerRoute.setMaxPerRoute(route, 4);
impl.setBackoffFactor(2); // Sets the growth rate to 2 for this test
impl.backOff(route);
assertEquals(1, connPerRoute.getMaxPerRoute(route)); // Corrected expected value
}
@Test
public void exponentialGrowthRateIsConfigurable() throws InterruptedException {
final int customCoolDownMs = 500;
connPerRoute.setMaxPerRoute(route, 4);
impl.setBackoffFactor(0.5);
impl.setCoolDown(TimeValue.ofMilliseconds(customCoolDownMs));
impl.backOff(route);
assertEquals(2, connPerRoute.getMaxPerRoute(route));
Thread.sleep(customCoolDownMs + 100); // Sleep for a slightly longer than the custom cooldown period
impl.backOff(route);
assertEquals(1, connPerRoute.getMaxPerRoute(route));
}
@Test
public void doesNotIncreaseBeyondPerHostMaxOnProbe() {
connPerRoute.setDefaultMaxPerRoute(5);
connPerRoute.setMaxPerRoute(route, 5);
impl.setPerHostConnectionCap(5);
impl.probe(route);
assertEquals(5, connPerRoute.getMaxPerRoute(route));
}
@Test
public void backoffDoesNotAdjustDuringCoolDownPeriod() throws InterruptedException {
connPerRoute.setMaxPerRoute(route, 4);
impl.backOff(route);
final long max = connPerRoute.getMaxPerRoute(route);
Thread.sleep(1); // Sleep for 1 ms
impl.backOff(route);
assertEquals(max, connPerRoute.getMaxPerRoute(route));
}
@Test
public void backoffStillAdjustsAfterCoolDownPeriod() throws InterruptedException {
connPerRoute.setMaxPerRoute(route, 8);
impl.backOff(route);
final long max = connPerRoute.getMaxPerRoute(route);
Thread.sleep(DEFAULT_COOL_DOWN_MS + 1); // Sleep for cooldown period + 1 ms
impl.backOff(route);
assertTrue(max == 1 || max > connPerRoute.getMaxPerRoute(route));
}
@Test
public void probeDoesNotAdjustDuringCooldownPeriod() throws InterruptedException {
connPerRoute.setMaxPerRoute(route, 4);
impl.probe(route);
final long max = connPerRoute.getMaxPerRoute(route);
Thread.sleep(1); // Sleep for 1 ms
impl.probe(route);
assertEquals(max, connPerRoute.getMaxPerRoute(route));
}
@Test
public void probeStillAdjustsAfterCoolDownPeriod() throws InterruptedException {
connPerRoute.setMaxPerRoute(route, 8);
impl.probe(route);
final long max = connPerRoute.getMaxPerRoute(route);
Thread.sleep(DEFAULT_COOL_DOWN_MS + 1); // Sleep for cooldown period + 1 ms
impl.probe(route);
assertTrue(max < connPerRoute.getMaxPerRoute(route));
}
@Test
public void willBackoffImmediatelyEvenAfterAProbe() {
connPerRoute.setMaxPerRoute(route, 8);
final long now = System.currentTimeMillis();
impl.probe(route);
final long max = connPerRoute.getMaxPerRoute(route);
impl.backOff(route);
assertTrue(connPerRoute.getMaxPerRoute(route) < max);
}
@Test
public void coolDownPeriodIsConfigurable() throws InterruptedException {
final long cd = 500; // Fixed cooldown period of 500 milliseconds
impl.setCoolDown(TimeValue.ofMilliseconds(cd));
// Sleep for a short duration before starting the test to reduce potential timing issues
Thread.sleep(100);
// Probe and check if the connection count remains the same during the cooldown period
impl.probe(route);
final int max0 = connPerRoute.getMaxPerRoute(route);
Thread.sleep(cd / 2); // Sleep for half the cooldown period
impl.probe(route);
assertEquals(max0, connPerRoute.getMaxPerRoute(route));
// Probe and check if the connection count increases after the cooldown period
Thread.sleep(cd / 2 + 1); // Sleep for the remaining half of the cooldown period + 1 ms
impl.probe(route);
assertTrue(max0 < connPerRoute.getMaxPerRoute(route));
}
}

View File

@ -0,0 +1,162 @@
/*
* ====================================================================
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
* ====================================================================
*
* This software consists of voluntary contributions made by many
* individuals on behalf of the Apache Software Foundation. For more
* information on the Apache Software Foundation, please see
* <http://www.apache.org/>.
*
*/
package org.apache.hc.client5.http.impl.classic;
import org.apache.hc.client5.http.HttpRoute;
import org.apache.hc.core5.http.HttpHost;
import org.apache.hc.core5.util.TimeValue;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestLinearBackoffManager {
private LinearBackoffManager impl;
private MockConnPoolControl connPerRoute;
private HttpRoute route;
private static final long DEFAULT_COOL_DOWN_MS = 5000;
@BeforeEach
public void setUp() {
connPerRoute = new MockConnPoolControl();
route = new HttpRoute(new HttpHost("localhost", 80));
impl = new LinearBackoffManager(connPerRoute);
impl.setCoolDown(TimeValue.ofMilliseconds(DEFAULT_COOL_DOWN_MS));
}
@Test
public void incrementsConnectionsOnBackoff() {
final LinearBackoffManager impl = new LinearBackoffManager(connPerRoute);
impl.setCoolDown(TimeValue.ofMilliseconds(DEFAULT_COOL_DOWN_MS)); // Set the cool-down period
connPerRoute.setMaxPerRoute(route, 4);
impl.backOff(route);
assertEquals(5, connPerRoute.getMaxPerRoute(route));
}
@Test
public void decrementsConnectionsOnProbe() {
connPerRoute.setMaxPerRoute(route, 4);
impl.probe(route);
assertEquals(3, connPerRoute.getMaxPerRoute(route));
}
@Test
public void backoffDoesNotAdjustDuringCoolDownPeriod() throws InterruptedException {
connPerRoute.setMaxPerRoute(route, 4);
impl.backOff(route);
final long max = connPerRoute.getMaxPerRoute(route);
Thread.sleep(1); // Sleep for 1 ms
impl.backOff(route);
assertEquals(max, connPerRoute.getMaxPerRoute(route));
}
@Test
public void backoffStillAdjustsAfterCoolDownPeriod() throws InterruptedException {
final LinearBackoffManager impl = new LinearBackoffManager(connPerRoute);
impl.setCoolDown(TimeValue.ofMilliseconds(DEFAULT_COOL_DOWN_MS)); // Set the cool-down period
connPerRoute.setMaxPerRoute(route, 4);
impl.backOff(route);
final int max1 = connPerRoute.getMaxPerRoute(route);
Thread.sleep(DEFAULT_COOL_DOWN_MS + 1); // Sleep for cooldown period + 1 ms
impl.backOff(route);
final int max2 = connPerRoute.getMaxPerRoute(route);
assertTrue(max2 > max1);
}
@Test
public void probeDoesNotAdjustDuringCooldownPeriod() throws InterruptedException {
connPerRoute.setMaxPerRoute(route, 4);
impl.probe(route);
final long max = connPerRoute.getMaxPerRoute(route);
Thread.sleep(1); // Sleep for 1 ms
impl.probe(route);
assertEquals(max, connPerRoute.getMaxPerRoute(route));
}
@Test
public void probeStillAdjustsAfterCoolDownPeriod() throws InterruptedException {
connPerRoute.setMaxPerRoute(route, 4);
impl.probe(route);
Thread.sleep(DEFAULT_COOL_DOWN_MS + 1); // Sleep for cooldown period + 1 ms
impl.probe(route);
final long newMax = connPerRoute.getMaxPerRoute(route);
assertEquals(2, newMax); // The cap is set to 2 by default
}
@Test
public void testSetPerHostConnectionCap() {
connPerRoute.setMaxPerRoute(route, 5);
impl.setPerHostConnectionCap(10); // Set the cap to a higher value
impl.backOff(route);
assertEquals(6, connPerRoute.getMaxPerRoute(route));
}
@Test
public void probeUpdatesRemainingAttemptsIndirectly() throws InterruptedException {
// Set initial max per route
connPerRoute.setMaxPerRoute(route, 4);
// Apply backOff twice
impl.backOff(route);
Thread.sleep(DEFAULT_COOL_DOWN_MS + 1);
impl.backOff(route);
// Ensure that connection pool size has increased
assertEquals(6, connPerRoute.getMaxPerRoute(route));
// Apply probe once
impl.probe(route);
// Wait for a longer cool down period
Thread.sleep(DEFAULT_COOL_DOWN_MS * 2);
// Apply probe once more
impl.probe(route);
// Check that connection pool size has decreased once, indicating that the remaining attempts were updated
assertEquals(5, connPerRoute.getMaxPerRoute(route));
}
@Test
public void linearIncrementTest() throws InterruptedException {
final int initialMax = 4;
connPerRoute.setMaxPerRoute(route, initialMax);
for (int i = 1; i <= 5; i++) {
impl.backOff(route);
assertEquals(initialMax + i, connPerRoute.getMaxPerRoute(route));
Thread.sleep(DEFAULT_COOL_DOWN_MS + 1);
}
}
}