Fixes #8493: RemoveIdleDestinations's race condition and improve logging.
Signed-off-by: Ludovic Orban <lorban@bitronix.be>
This commit is contained in:
parent
935d894872
commit
de13ceff36
|
@ -275,7 +275,7 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
||||||
{
|
{
|
||||||
pending.decrementAndGet();
|
pending.decrementAndGet();
|
||||||
if (LOG.isDebugEnabled())
|
if (LOG.isDebugEnabled())
|
||||||
LOG.debug("Not creating connection as pool is full, pending: {}", pending);
|
LOG.debug("Not creating connection as pool {} is full, pending: {}", pool, pending);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -517,15 +517,17 @@ public abstract class AbstractConnectionPool extends ContainerLifeCycle implemen
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
return String.format("%s@%x[c=%d/%d/%d,a=%d,i=%d,q=%d]",
|
return String.format("%s@%x[s=%s,c=%d/%d/%d,a=%d,i=%d,q=%d,p=%s]",
|
||||||
getClass().getSimpleName(),
|
getClass().getSimpleName(),
|
||||||
hashCode(),
|
hashCode(),
|
||||||
|
getState(),
|
||||||
getPendingConnectionCount(),
|
getPendingConnectionCount(),
|
||||||
getConnectionCount(),
|
getConnectionCount(),
|
||||||
getMaxConnectionCount(),
|
getMaxConnectionCount(),
|
||||||
getActiveConnectionCount(),
|
getActiveConnectionCount(),
|
||||||
getIdleConnectionCount(),
|
getIdleConnectionCount(),
|
||||||
destination.getQueuedRequestCount());
|
destination.getQueuedRequestCount(),
|
||||||
|
pool);
|
||||||
}
|
}
|
||||||
|
|
||||||
private class FutureConnection extends Promise.Completable<Connection>
|
private class FutureConnection extends Promise.Completable<Connection>
|
||||||
|
|
|
@ -74,6 +74,7 @@ import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||||
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
import org.eclipse.jetty.util.thread.QueuedThreadPool;
|
||||||
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
|
import org.eclipse.jetty.util.thread.ScheduledExecutorScheduler;
|
||||||
import org.eclipse.jetty.util.thread.Scheduler;
|
import org.eclipse.jetty.util.thread.Scheduler;
|
||||||
|
import org.eclipse.jetty.util.thread.Sweeper;
|
||||||
import org.eclipse.jetty.util.thread.ThreadPool;
|
import org.eclipse.jetty.util.thread.ThreadPool;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
@ -142,12 +143,13 @@ public class HttpClient extends ContainerLifeCycle
|
||||||
private boolean tcpNoDelay = true;
|
private boolean tcpNoDelay = true;
|
||||||
private boolean strictEventOrdering = false;
|
private boolean strictEventOrdering = false;
|
||||||
private HttpField encodingField;
|
private HttpField encodingField;
|
||||||
private boolean removeIdleDestinations = false;
|
private long destinationIdleTimeout;
|
||||||
private String name = getClass().getSimpleName() + "@" + Integer.toHexString(hashCode());
|
private String name = getClass().getSimpleName() + "@" + Integer.toHexString(hashCode());
|
||||||
private HttpCompliance httpCompliance = HttpCompliance.RFC7230;
|
private HttpCompliance httpCompliance = HttpCompliance.RFC7230;
|
||||||
private String defaultRequestContentType = "application/octet-stream";
|
private String defaultRequestContentType = "application/octet-stream";
|
||||||
private boolean useInputDirectByteBuffers = true;
|
private boolean useInputDirectByteBuffers = true;
|
||||||
private boolean useOutputDirectByteBuffers = true;
|
private boolean useOutputDirectByteBuffers = true;
|
||||||
|
private Sweeper destinationSweeper;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a HttpClient instance that can perform HTTP/1.1 requests to non-TLS and TLS destinations.
|
* Creates a HttpClient instance that can perform HTTP/1.1 requests to non-TLS and TLS destinations.
|
||||||
|
@ -222,7 +224,14 @@ public class HttpClient extends ContainerLifeCycle
|
||||||
cookieStore = cookieManager.getCookieStore();
|
cookieStore = cookieManager.getCookieStore();
|
||||||
|
|
||||||
transport.setHttpClient(this);
|
transport.setHttpClient(this);
|
||||||
|
|
||||||
super.doStart();
|
super.doStart();
|
||||||
|
|
||||||
|
if (getDestinationIdleTimeout() > 0L)
|
||||||
|
{
|
||||||
|
destinationSweeper = new Sweeper(scheduler, 1000L);
|
||||||
|
destinationSweeper.start();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private CookieManager newCookieManager()
|
private CookieManager newCookieManager()
|
||||||
|
@ -233,6 +242,12 @@ public class HttpClient extends ContainerLifeCycle
|
||||||
@Override
|
@Override
|
||||||
protected void doStop() throws Exception
|
protected void doStop() throws Exception
|
||||||
{
|
{
|
||||||
|
if (destinationSweeper != null)
|
||||||
|
{
|
||||||
|
destinationSweeper.stop();
|
||||||
|
destinationSweeper = null;
|
||||||
|
}
|
||||||
|
|
||||||
decoderFactories.clear();
|
decoderFactories.clear();
|
||||||
handlers.clear();
|
handlers.clear();
|
||||||
|
|
||||||
|
@ -290,6 +305,11 @@ public class HttpClient extends ContainerLifeCycle
|
||||||
return cookieManager;
|
return cookieManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Sweeper getDestinationSweeper()
|
||||||
|
{
|
||||||
|
return destinationSweeper;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the authentication store associated with this instance
|
* @return the authentication store associated with this instance
|
||||||
*/
|
*/
|
||||||
|
@ -529,21 +549,28 @@ public class HttpClient extends ContainerLifeCycle
|
||||||
*/
|
*/
|
||||||
public HttpDestination resolveDestination(Origin origin)
|
public HttpDestination resolveDestination(Origin origin)
|
||||||
{
|
{
|
||||||
return destinations.computeIfAbsent(origin, o ->
|
return destinations.compute(origin, (k, v) ->
|
||||||
{
|
{
|
||||||
HttpDestination destination = getTransport().newHttpDestination(o);
|
if (v == null || v.stale())
|
||||||
// Start the destination before it's published to other threads.
|
{
|
||||||
addManaged(destination);
|
HttpDestination newDestination = getTransport().newHttpDestination(k);
|
||||||
if (LOG.isDebugEnabled())
|
// Start the destination before it's published to other threads.
|
||||||
LOG.debug("Created {}", destination);
|
addManaged(newDestination);
|
||||||
return destination;
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("Created {}; existing: '{}'", newDestination, v);
|
||||||
|
return newDestination;
|
||||||
|
}
|
||||||
|
return v;
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
protected boolean removeDestination(HttpDestination destination)
|
protected boolean removeDestination(HttpDestination destination)
|
||||||
{
|
{
|
||||||
|
boolean removed = destinations.remove(destination.getOrigin(), destination);
|
||||||
removeBean(destination);
|
removeBean(destination);
|
||||||
return destinations.remove(destination.getOrigin(), destination);
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("Removed {}; result: {}", destination, removed);
|
||||||
|
return removed;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1010,14 +1037,50 @@ public class HttpClient extends ContainerLifeCycle
|
||||||
this.strictEventOrdering = strictEventOrdering;
|
this.strictEventOrdering = strictEventOrdering;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* The default value is 0
|
||||||
|
* @return the time in ms after which idle destinations are removed
|
||||||
|
* @see #setDestinationIdleTimeout(long)
|
||||||
|
*/
|
||||||
|
@ManagedAttribute("The time in ms after which idle destinations are removed, disabled when zero or negative")
|
||||||
|
public long getDestinationIdleTimeout()
|
||||||
|
{
|
||||||
|
return destinationIdleTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* <p>
|
||||||
|
* Whether destinations that have no connections (nor active nor idle) and no exchanges
|
||||||
|
* should be removed after the specified timeout.
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* If the specified {@code destinationIdleTimeout} is 0 or negative, then the destinations
|
||||||
|
* are not removed.
|
||||||
|
* </p>
|
||||||
|
* <p>
|
||||||
|
* Avoids accumulating destinations when applications (e.g. a spider bot or web crawler)
|
||||||
|
* hit a lot of different destinations that won't be visited again.
|
||||||
|
* </p>
|
||||||
|
*
|
||||||
|
* @param destinationIdleTimeout the time in ms after which idle destinations are removed
|
||||||
|
*/
|
||||||
|
public void setDestinationIdleTimeout(long destinationIdleTimeout)
|
||||||
|
{
|
||||||
|
if (isStarted())
|
||||||
|
throw new IllegalStateException();
|
||||||
|
this.destinationIdleTimeout = destinationIdleTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return whether destinations that have no connections should be removed
|
* @return whether destinations that have no connections should be removed
|
||||||
* @see #setRemoveIdleDestinations(boolean)
|
* @see #setRemoveIdleDestinations(boolean)
|
||||||
|
* @deprecated replaced by {@link #getDestinationIdleTimeout()}
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
@ManagedAttribute("Whether idle destinations are removed")
|
@ManagedAttribute("Whether idle destinations are removed")
|
||||||
public boolean isRemoveIdleDestinations()
|
public boolean isRemoveIdleDestinations()
|
||||||
{
|
{
|
||||||
return removeIdleDestinations;
|
return destinationIdleTimeout > 0L;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -1031,10 +1094,12 @@ public class HttpClient extends ContainerLifeCycle
|
||||||
*
|
*
|
||||||
* @param removeIdleDestinations whether destinations that have no connections should be removed
|
* @param removeIdleDestinations whether destinations that have no connections should be removed
|
||||||
* @see org.eclipse.jetty.client.DuplexConnectionPool
|
* @see org.eclipse.jetty.client.DuplexConnectionPool
|
||||||
|
* @deprecated replaced by {@link #setDestinationIdleTimeout(long)}, calls the latter with a value of 10000 ms.
|
||||||
*/
|
*/
|
||||||
|
@Deprecated
|
||||||
public void setRemoveIdleDestinations(boolean removeIdleDestinations)
|
public void setRemoveIdleDestinations(boolean removeIdleDestinations)
|
||||||
{
|
{
|
||||||
this.removeIdleDestinations = removeIdleDestinations;
|
setDestinationIdleTimeout(removeIdleDestinations ? 10_000L : 0L);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
@ -21,6 +21,7 @@ import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Queue;
|
import java.util.Queue;
|
||||||
import java.util.concurrent.RejectedExecutionException;
|
import java.util.concurrent.RejectedExecutionException;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TimeoutException;
|
import java.util.concurrent.TimeoutException;
|
||||||
|
|
||||||
import org.eclipse.jetty.client.api.Connection;
|
import org.eclipse.jetty.client.api.Connection;
|
||||||
|
@ -40,14 +41,16 @@ import org.eclipse.jetty.util.annotation.ManagedObject;
|
||||||
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
import org.eclipse.jetty.util.component.ContainerLifeCycle;
|
||||||
import org.eclipse.jetty.util.component.Dumpable;
|
import org.eclipse.jetty.util.component.Dumpable;
|
||||||
import org.eclipse.jetty.util.component.DumpableCollection;
|
import org.eclipse.jetty.util.component.DumpableCollection;
|
||||||
|
import org.eclipse.jetty.util.component.LifeCycle;
|
||||||
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
import org.eclipse.jetty.util.ssl.SslContextFactory;
|
||||||
|
import org.eclipse.jetty.util.thread.AutoLock;
|
||||||
import org.eclipse.jetty.util.thread.Scheduler;
|
import org.eclipse.jetty.util.thread.Scheduler;
|
||||||
import org.eclipse.jetty.util.thread.Sweeper;
|
import org.eclipse.jetty.util.thread.Sweeper;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
@ManagedObject
|
@ManagedObject
|
||||||
public abstract class HttpDestination extends ContainerLifeCycle implements Destination, Closeable, Callback, Dumpable
|
public abstract class HttpDestination extends ContainerLifeCycle implements Destination, Closeable, Callback, Dumpable, Sweeper.Sweepable
|
||||||
{
|
{
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(HttpDestination.class);
|
private static final Logger LOG = LoggerFactory.getLogger(HttpDestination.class);
|
||||||
|
|
||||||
|
@ -60,7 +63,10 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
||||||
private final ClientConnectionFactory connectionFactory;
|
private final ClientConnectionFactory connectionFactory;
|
||||||
private final HttpField hostField;
|
private final HttpField hostField;
|
||||||
private final RequestTimeouts requestTimeouts;
|
private final RequestTimeouts requestTimeouts;
|
||||||
|
private final AutoLock staleLock = new AutoLock();
|
||||||
private ConnectionPool connectionPool;
|
private ConnectionPool connectionPool;
|
||||||
|
private boolean stale;
|
||||||
|
private long activeNanos;
|
||||||
|
|
||||||
public HttpDestination(HttpClient client, Origin origin, boolean intrinsicallySecure)
|
public HttpDestination(HttpClient client, Origin origin, boolean intrinsicallySecure)
|
||||||
{
|
{
|
||||||
|
@ -104,23 +110,78 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
||||||
connectionPool.accept(connection);
|
connectionPool.accept(connection);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public boolean stale()
|
||||||
|
{
|
||||||
|
try (AutoLock l = staleLock.lock())
|
||||||
|
{
|
||||||
|
boolean stale = this.stale;
|
||||||
|
if (!stale)
|
||||||
|
this.activeNanos = System.nanoTime();
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("Stale check done with result {} on {}", stale, this);
|
||||||
|
return stale;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean sweep()
|
||||||
|
{
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("Sweep check in progress on {}", this);
|
||||||
|
boolean remove = false;
|
||||||
|
try (AutoLock l = staleLock.lock())
|
||||||
|
{
|
||||||
|
boolean stale = exchanges.isEmpty() && connectionPool.isEmpty();
|
||||||
|
if (!stale)
|
||||||
|
{
|
||||||
|
this.activeNanos = System.nanoTime();
|
||||||
|
}
|
||||||
|
else if (isStaleDelayExpired())
|
||||||
|
{
|
||||||
|
this.stale = true;
|
||||||
|
remove = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (remove)
|
||||||
|
{
|
||||||
|
getHttpClient().removeDestination(this);
|
||||||
|
LifeCycle.stop(this);
|
||||||
|
}
|
||||||
|
if (LOG.isDebugEnabled())
|
||||||
|
LOG.debug("Sweep check done with result {} on {}", remove, this);
|
||||||
|
return remove;
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isStaleDelayExpired()
|
||||||
|
{
|
||||||
|
assert staleLock.isHeldByCurrentThread();
|
||||||
|
long destinationIdleTimeout = TimeUnit.MILLISECONDS.toNanos(getHttpClient().getDestinationIdleTimeout());
|
||||||
|
return System.nanoTime() - activeNanos >= destinationIdleTimeout;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doStart() throws Exception
|
protected void doStart() throws Exception
|
||||||
{
|
{
|
||||||
this.connectionPool = newConnectionPool(client);
|
this.connectionPool = newConnectionPool(client);
|
||||||
addBean(connectionPool, true);
|
addBean(connectionPool, true);
|
||||||
super.doStart();
|
super.doStart();
|
||||||
Sweeper sweeper = client.getBean(Sweeper.class);
|
Sweeper connectionPoolSweeper = client.getBean(Sweeper.class);
|
||||||
if (sweeper != null && connectionPool instanceof Sweeper.Sweepable)
|
if (connectionPoolSweeper != null && connectionPool instanceof Sweeper.Sweepable)
|
||||||
sweeper.offer((Sweeper.Sweepable)connectionPool);
|
connectionPoolSweeper.offer((Sweeper.Sweepable)connectionPool);
|
||||||
|
Sweeper destinationSweeper = getHttpClient().getDestinationSweeper();
|
||||||
|
if (destinationSweeper != null)
|
||||||
|
destinationSweeper.offer(this);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected void doStop() throws Exception
|
protected void doStop() throws Exception
|
||||||
{
|
{
|
||||||
Sweeper sweeper = client.getBean(Sweeper.class);
|
Sweeper destinationSweeper = getHttpClient().getDestinationSweeper();
|
||||||
if (sweeper != null && connectionPool instanceof Sweeper.Sweepable)
|
if (destinationSweeper != null)
|
||||||
sweeper.remove((Sweeper.Sweepable)connectionPool);
|
destinationSweeper.remove(this);
|
||||||
|
Sweeper connectionPoolSweeper = client.getBean(Sweeper.class);
|
||||||
|
if (connectionPoolSweeper != null && connectionPool instanceof Sweeper.Sweepable)
|
||||||
|
connectionPoolSweeper.remove((Sweeper.Sweepable)connectionPool);
|
||||||
super.doStop();
|
super.doStop();
|
||||||
removeBean(connectionPool);
|
removeBean(connectionPool);
|
||||||
}
|
}
|
||||||
|
@ -449,11 +510,7 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
||||||
{
|
{
|
||||||
boolean removed = connectionPool.remove(connection);
|
boolean removed = connectionPool.remove(connection);
|
||||||
|
|
||||||
if (getHttpExchanges().isEmpty())
|
if (removed)
|
||||||
{
|
|
||||||
tryRemoveIdleDestination();
|
|
||||||
}
|
|
||||||
else if (removed)
|
|
||||||
{
|
{
|
||||||
// Process queued requests that may be waiting.
|
// Process queued requests that may be waiting.
|
||||||
// We may create a connection that is not
|
// We may create a connection that is not
|
||||||
|
@ -478,22 +535,6 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
||||||
{
|
{
|
||||||
exchange.getRequest().abort(cause);
|
exchange.getRequest().abort(cause);
|
||||||
}
|
}
|
||||||
if (exchanges.isEmpty())
|
|
||||||
tryRemoveIdleDestination();
|
|
||||||
}
|
|
||||||
|
|
||||||
private void tryRemoveIdleDestination()
|
|
||||||
{
|
|
||||||
if (getHttpClient().isRemoveIdleDestinations() && connectionPool.isEmpty())
|
|
||||||
{
|
|
||||||
// There is a race condition between this thread removing the destination
|
|
||||||
// and another thread queueing a request to this same destination.
|
|
||||||
// If this destination is removed, but the request queued, a new connection
|
|
||||||
// will be opened, the exchange will be executed and eventually the connection
|
|
||||||
// will idle timeout and be closed. Meanwhile a new destination will be created
|
|
||||||
// in HttpClient and will be used for other requests.
|
|
||||||
getHttpClient().removeDestination(this);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -507,16 +548,39 @@ public abstract class HttpDestination extends ContainerLifeCycle implements Dest
|
||||||
return getOrigin().asString();
|
return getOrigin().asString();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ManagedAttribute("For how long this destination has been idle in ms")
|
||||||
|
public long getIdle()
|
||||||
|
{
|
||||||
|
if (getHttpClient().getDestinationIdleTimeout() <= 0L)
|
||||||
|
return -1;
|
||||||
|
try (AutoLock l = staleLock.lock())
|
||||||
|
{
|
||||||
|
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - activeNanos);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@ManagedAttribute("Whether this destinations is stale")
|
||||||
|
public boolean isStale()
|
||||||
|
{
|
||||||
|
try (AutoLock l = staleLock.lock())
|
||||||
|
{
|
||||||
|
return this.stale;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String toString()
|
public String toString()
|
||||||
{
|
{
|
||||||
return String.format("%s[%s]@%x%s,queue=%d,pool=%s",
|
return String.format("%s[%s]@%x%s,state=%s,queue=%d,pool=%s,stale=%b,idle=%d",
|
||||||
HttpDestination.class.getSimpleName(),
|
HttpDestination.class.getSimpleName(),
|
||||||
getOrigin(),
|
getOrigin(),
|
||||||
hashCode(),
|
hashCode(),
|
||||||
proxy == null ? "" : "(via " + proxy + ")",
|
proxy == null ? "" : "(via " + proxy + ")",
|
||||||
|
getState(),
|
||||||
getQueuedRequestCount(),
|
getQueuedRequestCount(),
|
||||||
getConnectionPool());
|
getConnectionPool(),
|
||||||
|
isStale(),
|
||||||
|
getIdle());
|
||||||
}
|
}
|
||||||
|
|
||||||
@FunctionalInterface
|
@FunctionalInterface
|
||||||
|
|
|
@ -19,6 +19,7 @@ import java.util.concurrent.RejectedExecutionException;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.function.Supplier;
|
import java.util.function.Supplier;
|
||||||
|
|
||||||
|
import org.awaitility.Awaitility;
|
||||||
import org.eclipse.jetty.client.api.Connection;
|
import org.eclipse.jetty.client.api.Connection;
|
||||||
import org.eclipse.jetty.client.api.ContentResponse;
|
import org.eclipse.jetty.client.api.ContentResponse;
|
||||||
import org.eclipse.jetty.client.api.Destination;
|
import org.eclipse.jetty.client.api.Destination;
|
||||||
|
@ -291,6 +292,9 @@ public class DuplexHttpDestinationTest extends AbstractHttpClientServerTest
|
||||||
public void testDestinationIsRemoved(Scenario scenario) throws Exception
|
public void testDestinationIsRemoved(Scenario scenario) throws Exception
|
||||||
{
|
{
|
||||||
start(scenario, new EmptyServerHandler());
|
start(scenario, new EmptyServerHandler());
|
||||||
|
client.stop();
|
||||||
|
client.setDestinationIdleTimeout(1000);
|
||||||
|
client.start();
|
||||||
|
|
||||||
String host = "localhost";
|
String host = "localhost";
|
||||||
int port = connector.getLocalPort();
|
int port = connector.getLocalPort();
|
||||||
|
@ -305,7 +309,7 @@ public class DuplexHttpDestinationTest extends AbstractHttpClientServerTest
|
||||||
Destination destinationAfter = client.resolveDestination(request);
|
Destination destinationAfter = client.resolveDestination(request);
|
||||||
assertSame(destinationBefore, destinationAfter);
|
assertSame(destinationBefore, destinationAfter);
|
||||||
|
|
||||||
client.setRemoveIdleDestinations(true);
|
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> client.getDestinations().isEmpty());
|
||||||
|
|
||||||
request = client.newRequest(host, port)
|
request = client.newRequest(host, port)
|
||||||
.scheme(scenario.getScheme())
|
.scheme(scenario.getScheme())
|
||||||
|
@ -323,21 +327,19 @@ public class DuplexHttpDestinationTest extends AbstractHttpClientServerTest
|
||||||
public void testDestinationIsRemovedAfterConnectionError(Scenario scenario) throws Exception
|
public void testDestinationIsRemovedAfterConnectionError(Scenario scenario) throws Exception
|
||||||
{
|
{
|
||||||
start(scenario, new EmptyServerHandler());
|
start(scenario, new EmptyServerHandler());
|
||||||
|
client.stop();
|
||||||
|
client.setDestinationIdleTimeout(1000);
|
||||||
|
client.start();
|
||||||
|
|
||||||
String host = "localhost";
|
String host = "localhost";
|
||||||
int port = connector.getLocalPort();
|
int port = connector.getLocalPort();
|
||||||
client.setRemoveIdleDestinations(true);
|
|
||||||
assertTrue(client.getDestinations().isEmpty(), "Destinations of a fresh client must be empty");
|
assertTrue(client.getDestinations().isEmpty(), "Destinations of a fresh client must be empty");
|
||||||
|
|
||||||
server.stop();
|
server.stop();
|
||||||
Request request = client.newRequest(host, port).scheme(scenario.getScheme());
|
Request request = client.newRequest(host, port).scheme(scenario.getScheme());
|
||||||
assertThrows(Exception.class, request::send);
|
assertThrows(Exception.class, request::send);
|
||||||
|
|
||||||
long deadline = System.nanoTime() + TimeUnit.SECONDS.toNanos(1);
|
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> client.getDestinations().isEmpty());
|
||||||
while (!client.getDestinations().isEmpty() && System.nanoTime() < deadline)
|
|
||||||
{
|
|
||||||
Thread.sleep(10);
|
|
||||||
}
|
|
||||||
assertTrue(client.getDestinations().isEmpty(), "Destination must be removed after connection error");
|
assertTrue(client.getDestinations().isEmpty(), "Destination must be removed after connection error");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -70,7 +70,6 @@ public class HttpClientProxyProtocolTest
|
||||||
clientThreads.setName("client");
|
clientThreads.setName("client");
|
||||||
client = new HttpClient();
|
client = new HttpClient();
|
||||||
client.setExecutor(clientThreads);
|
client.setExecutor(clientThreads);
|
||||||
client.setRemoveIdleDestinations(false);
|
|
||||||
client.start();
|
client.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -94,6 +94,7 @@ import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
import org.junit.jupiter.params.ParameterizedTest;
|
import org.junit.jupiter.params.ParameterizedTest;
|
||||||
import org.junit.jupiter.params.provider.ArgumentsSource;
|
import org.junit.jupiter.params.provider.ArgumentsSource;
|
||||||
|
|
||||||
|
import static org.awaitility.Awaitility.await;
|
||||||
import static org.hamcrest.MatcherAssert.assertThat;
|
import static org.hamcrest.MatcherAssert.assertThat;
|
||||||
import static org.hamcrest.Matchers.containsString;
|
import static org.hamcrest.Matchers.containsString;
|
||||||
import static org.hamcrest.Matchers.instanceOf;
|
import static org.hamcrest.Matchers.instanceOf;
|
||||||
|
@ -528,6 +529,41 @@ public class HttpClientTest extends AbstractHttpClientServerTest
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ParameterizedTest
|
||||||
|
@ArgumentsSource(ScenarioProvider.class)
|
||||||
|
public void testRetryWithDestinationIdleTimeoutEnabled(Scenario scenario) throws Exception
|
||||||
|
{
|
||||||
|
start(scenario, new EmptyServerHandler());
|
||||||
|
client.stop();
|
||||||
|
client.setDestinationIdleTimeout(1000);
|
||||||
|
client.setIdleTimeout(1000);
|
||||||
|
client.setMaxConnectionsPerDestination(1);
|
||||||
|
client.start();
|
||||||
|
|
||||||
|
try (StacklessLogging ignored = new StacklessLogging(org.eclipse.jetty.server.HttpChannel.class))
|
||||||
|
{
|
||||||
|
client.newRequest("localhost", connector.getLocalPort())
|
||||||
|
.scheme(scenario.getScheme())
|
||||||
|
.path("/one")
|
||||||
|
.send();
|
||||||
|
|
||||||
|
int idleTimeout = 100;
|
||||||
|
Thread.sleep(idleTimeout * 2);
|
||||||
|
|
||||||
|
// After serving a request over a connection that hasn't timed out, serving a second
|
||||||
|
// request with a shorter idle timeout will make the connection timeout immediately
|
||||||
|
// after being taken out of the pool. This triggers the retry mechanism.
|
||||||
|
client.newRequest("localhost", connector.getLocalPort())
|
||||||
|
.scheme(scenario.getScheme())
|
||||||
|
.path("/two")
|
||||||
|
.idleTimeout(idleTimeout, TimeUnit.MILLISECONDS)
|
||||||
|
.send();
|
||||||
|
}
|
||||||
|
|
||||||
|
// Wait for the sweeper to remove the idle HttpDestination.
|
||||||
|
await().atMost(5, TimeUnit.SECONDS).until(() -> client.getDestinations().isEmpty());
|
||||||
|
}
|
||||||
|
|
||||||
@ParameterizedTest
|
@ParameterizedTest
|
||||||
@ArgumentsSource(ScenarioProvider.class)
|
@ArgumentsSource(ScenarioProvider.class)
|
||||||
public void testExchangeIsCompleteOnlyWhenBothRequestAndResponseAreComplete(Scenario scenario) throws Exception
|
public void testExchangeIsCompleteOnlyWhenBothRequestAndResponseAreComplete(Scenario scenario) throws Exception
|
||||||
|
|
|
@ -127,7 +127,7 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
||||||
public Pool(StrategyType strategyType, int maxEntries, boolean cache)
|
public Pool(StrategyType strategyType, int maxEntries, boolean cache)
|
||||||
{
|
{
|
||||||
this.maxEntries = maxEntries;
|
this.maxEntries = maxEntries;
|
||||||
this.strategyType = strategyType;
|
this.strategyType = Objects.requireNonNull(strategyType);
|
||||||
this.cache = cache ? new ThreadLocal<>() : null;
|
this.cache = cache ? new ThreadLocal<>() : null;
|
||||||
this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
|
this.nextIndex = strategyType == StrategyType.ROUND_ROBIN ? new AtomicInteger() : null;
|
||||||
}
|
}
|
||||||
|
@ -336,14 +336,25 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
||||||
try (AutoLock l = lock.lock())
|
try (AutoLock l = lock.lock())
|
||||||
{
|
{
|
||||||
if (closed)
|
if (closed)
|
||||||
|
{
|
||||||
|
if (LOGGER.isDebugEnabled())
|
||||||
|
LOGGER.debug("{} is closed, returning null reserved entry", this);
|
||||||
return null;
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
// If we have no space
|
// If we have no space
|
||||||
if (maxEntries > 0 && entries.size() >= maxEntries)
|
int entriesSize = entries.size();
|
||||||
|
if (maxEntries > 0 && entriesSize >= maxEntries)
|
||||||
|
{
|
||||||
|
if (LOGGER.isDebugEnabled())
|
||||||
|
LOGGER.debug("{} has no space: {} >= {}, returning null reserved entry", this, entriesSize, maxEntries);
|
||||||
return null;
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
Entry entry = newEntry();
|
Entry entry = newEntry();
|
||||||
entries.add(entry);
|
entries.add(entry);
|
||||||
|
if (LOGGER.isDebugEnabled())
|
||||||
|
LOGGER.debug("{} returning new reserved entry {}", this, entry);
|
||||||
return entry;
|
return entry;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -514,6 +525,9 @@ public class Pool<T> implements AutoCloseable, Dumpable
|
||||||
@Override
|
@Override
|
||||||
public void close()
|
public void close()
|
||||||
{
|
{
|
||||||
|
if (LOGGER.isDebugEnabled())
|
||||||
|
LOGGER.debug("Closing {}", this);
|
||||||
|
|
||||||
List<Entry> copy;
|
List<Entry> copy;
|
||||||
try (AutoLock l = lock.lock())
|
try (AutoLock l = lock.lock())
|
||||||
{
|
{
|
||||||
|
|
Loading…
Reference in New Issue