Refactor HttpClient synchronized sections for virtual threads (#476)

- Replaced `synchronized` blocks with `ReentrantLock` in `LeaseRequest` to better support virtual threads introduced in JDK 21.
- Ensured each `LeaseRequest` instance has its own unique lock for maintaining original synchronization semantics.
- Addressed potential performance and deadlock issues with virtual threads by using explicit lock primitives from `java.util.concurrent.locks`.
This commit is contained in:
Arturo Bernal 2023-08-27 12:57:15 +02:00 committed by Oleg Kalnichevski
parent 097c17b78b
commit 8466b19861
9 changed files with 416 additions and 241 deletions

View File

@ -29,6 +29,7 @@ package org.apache.hc.client5.http.impl.cache;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
@ -53,9 +54,12 @@ public class BasicHttpCacheStorage implements HttpCacheStorage {
private final CacheMap entries;
private final ReentrantLock lock;
public BasicHttpCacheStorage(final CacheConfig config) {
super();
this.entries = new CacheMap(config.getMaxCacheEntries());
this.lock = new ReentrantLock();
}
/**
@ -67,9 +71,14 @@ public class BasicHttpCacheStorage implements HttpCacheStorage {
* HttpCacheEntry to place in the cache
*/
@Override
public synchronized void putEntry(
public void putEntry(
final String url, final HttpCacheEntry entry) throws ResourceIOException {
lock.lock();
try {
entries.put(url, entry);
} finally {
lock.unlock();
}
}
/**
@ -80,8 +89,13 @@ public class BasicHttpCacheStorage implements HttpCacheStorage {
* @return HttpCacheEntry if one exists, or null for cache miss
*/
@Override
public synchronized HttpCacheEntry getEntry(final String url) throws ResourceIOException {
public HttpCacheEntry getEntry(final String url) throws ResourceIOException {
lock.lock();
try {
return entries.get(url);
} finally {
lock.unlock();
}
}
/**
@ -91,15 +105,26 @@ public class BasicHttpCacheStorage implements HttpCacheStorage {
* Url that is the cache key
*/
@Override
public synchronized void removeEntry(final String url) throws ResourceIOException {
public void removeEntry(final String url) throws ResourceIOException {
lock.lock();
try {
entries.remove(url);
} finally {
lock.unlock();
}
}
@Override
public synchronized void updateEntry(
public void updateEntry(
final String url, final HttpCacheCASOperation casOperation) throws ResourceIOException {
lock.lock();
try {
final HttpCacheEntry existingEntry = entries.get(url);
entries.put(url, casOperation.execute(existingEntry));
} finally {
lock.unlock();
}
}
@Override

View File

@ -32,6 +32,7 @@ import java.security.NoSuchAlgorithmException;
import java.security.SecureRandom;
import java.util.Formatter;
import java.util.Locale;
import java.util.concurrent.locks.ReentrantLock;
/**
* Should produce reasonably unique tokens.
@ -43,6 +44,8 @@ class BasicIdGenerator {
private long count;
private final ReentrantLock lock;
public BasicIdGenerator() {
super();
String hostname;
@ -58,9 +61,12 @@ class BasicIdGenerator {
throw new Error(ex);
}
this.rnd.setSeed(System.currentTimeMillis());
this.lock = new ReentrantLock();
}
public synchronized void generate(final StringBuilder buffer) {
public void generate(final StringBuilder buffer) {
lock.lock();
try {
this.count++;
final int rndnum = this.rnd.nextInt();
buffer.append(System.currentTimeMillis());
@ -70,6 +76,9 @@ class BasicIdGenerator {
}
buffer.append('.');
buffer.append(this.hostname);
} finally {
lock.unlock();
}
}
public String generate() {

View File

@ -35,6 +35,7 @@ import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hc.client5.http.schedule.ConcurrentCountMap;
import org.apache.hc.client5.http.schedule.SchedulingStrategy;
@ -49,6 +50,8 @@ import org.slf4j.LoggerFactory;
*/
class CacheRevalidatorBase implements Closeable {
private final ReentrantLock lock;
interface ScheduledExecutor {
Future<?> schedule(Runnable command, TimeValue timeValue) throws RejectedExecutionException;
@ -103,6 +106,7 @@ class CacheRevalidatorBase implements Closeable {
this.schedulingStrategy = schedulingStrategy;
this.pendingRequest = new HashSet<>();
this.failureCache = new ConcurrentCountMap<>();
this.lock = new ReentrantLock();
}
/**
@ -119,7 +123,8 @@ class CacheRevalidatorBase implements Closeable {
* Schedules an asynchronous re-validation
*/
void scheduleRevalidation(final String cacheKey, final Runnable command) {
synchronized (pendingRequest) {
lock.lock();
try {
if (!pendingRequest.contains(cacheKey)) {
final int consecutiveFailedAttempts = failureCache.getCount(cacheKey);
final TimeValue executionTime = schedulingStrategy.schedule(consecutiveFailedAttempts);
@ -130,6 +135,8 @@ class CacheRevalidatorBase implements Closeable {
LOG.debug("Revalidation of cache entry with key {} could not be scheduled", cacheKey, ex);
}
}
} finally {
lock.unlock();
}
}
@ -145,21 +152,30 @@ class CacheRevalidatorBase implements Closeable {
void jobSuccessful(final String identifier) {
failureCache.resetCount(identifier);
synchronized (pendingRequest) {
lock.lock();
try {
pendingRequest.remove(identifier);
} finally {
lock.unlock();
}
}
void jobFailed(final String identifier) {
failureCache.increaseCount(identifier);
synchronized (pendingRequest) {
lock.lock();
try {
pendingRequest.remove(identifier);
} finally {
lock.unlock();
}
}
Set<String> getScheduledIdentifiers() {
synchronized (pendingRequest) {
lock.lock();
try {
return new HashSet<>(pendingRequest);
} finally {
lock.unlock();
}
}

View File

@ -34,6 +34,7 @@ import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hc.client5.http.cache.HttpCacheCASOperation;
import org.apache.hc.client5.http.cache.HttpCacheEntry;
@ -82,12 +83,15 @@ public class ManagedHttpCacheStorage implements HttpCacheStorage, Closeable {
private final Set<ResourceReference> resources;
private final AtomicBoolean active;
private final ReentrantLock lock;
public ManagedHttpCacheStorage(final CacheConfig config) {
super();
this.entries = new CacheMap(config.getMaxCacheEntries());
this.morque = new ReferenceQueue<>();
this.resources = new HashSet<>();
this.active = new AtomicBoolean(true);
this.lock = new ReentrantLock();
}
private void ensureValidState() {
@ -110,9 +114,12 @@ public class ManagedHttpCacheStorage implements HttpCacheStorage, Closeable {
Args.notNull(url, "URL");
Args.notNull(entry, "Cache entry");
ensureValidState();
synchronized (this) {
lock.lock();
try {
this.entries.put(url, entry);
keepResourceReference(entry);
} finally {
lock.unlock();
}
}
@ -120,8 +127,11 @@ public class ManagedHttpCacheStorage implements HttpCacheStorage, Closeable {
public HttpCacheEntry getEntry(final String url) throws ResourceIOException {
Args.notNull(url, "URL");
ensureValidState();
synchronized (this) {
lock.lock();
try {
return this.entries.get(url);
} finally {
lock.unlock();
}
}
@ -129,10 +139,13 @@ public class ManagedHttpCacheStorage implements HttpCacheStorage, Closeable {
public void removeEntry(final String url) throws ResourceIOException {
Args.notNull(url, "URL");
ensureValidState();
synchronized (this) {
lock.lock();
try {
// Cannot deallocate the associated resources immediately as the
// cache entry may still be in use
this.entries.remove(url);
} finally {
lock.unlock();
}
}
@ -143,13 +156,16 @@ public class ManagedHttpCacheStorage implements HttpCacheStorage, Closeable {
Args.notNull(url, "URL");
Args.notNull(casOperation, "CAS operation");
ensureValidState();
synchronized (this) {
lock.lock();
try {
final HttpCacheEntry existing = this.entries.get(url);
final HttpCacheEntry updated = casOperation.execute(existing);
this.entries.put(url, updated);
if (existing != updated) {
keepResourceReference(updated);
}
} finally {
lock.unlock();
}
}
@ -170,8 +186,11 @@ public class ManagedHttpCacheStorage implements HttpCacheStorage, Closeable {
if (isActive()) {
ResourceReference ref;
while ((ref = (ResourceReference) this.morque.poll()) != null) {
synchronized (this) {
lock.lock();
try {
this.resources.remove(ref);
} finally {
lock.unlock();
}
ref.getResource().dispose();
}
@ -180,7 +199,8 @@ public class ManagedHttpCacheStorage implements HttpCacheStorage, Closeable {
public void shutdown() {
if (compareAndSet()) {
synchronized (this) {
lock.lock();
try {
this.entries.clear();
for (final ResourceReference ref: this.resources) {
ref.getResource().dispose();
@ -188,6 +208,8 @@ public class ManagedHttpCacheStorage implements HttpCacheStorage, Closeable {
this.resources.clear();
while (this.morque.poll() != null) {
}
} finally {
lock.unlock();
}
}
}
@ -195,12 +217,15 @@ public class ManagedHttpCacheStorage implements HttpCacheStorage, Closeable {
@Override
public void close() {
if (compareAndSet()) {
synchronized (this) {
lock.lock();
try {
ResourceReference ref;
while ((ref = (ResourceReference) this.morque.poll()) != null) {
this.resources.remove(ref);
ref.getResource().dispose();
}
} finally {
lock.unlock();
}
}
}

View File

@ -27,6 +27,8 @@
package org.apache.hc.client5.http.impl.cookie;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hc.client5.http.cookie.CookieSpec;
import org.apache.hc.client5.http.cookie.CookieSpecFactory;
import org.apache.hc.core5.annotation.Contract;
@ -43,17 +45,23 @@ public class IgnoreCookieSpecFactory implements CookieSpecFactory {
private volatile CookieSpec cookieSpec;
private final ReentrantLock lock;
public IgnoreCookieSpecFactory() {
super();
this.lock = new ReentrantLock();
}
@Override
public CookieSpec create(final HttpContext context) {
if (cookieSpec == null) {
synchronized (this) {
lock.lock();
try {
if (cookieSpec == null) {
this.cookieSpec = IgnoreSpecSpec.INSTANCE;
}
} finally {
lock.unlock();
}
}
return this.cookieSpec;

View File

@ -27,6 +27,8 @@
package org.apache.hc.client5.http.impl.cookie;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hc.client5.http.cookie.Cookie;
import org.apache.hc.client5.http.cookie.CookieOrigin;
import org.apache.hc.client5.http.cookie.CookieSpec;
@ -48,6 +50,8 @@ import org.apache.hc.core5.http.protocol.HttpContext;
@Contract(threading = ThreadingBehavior.SAFE)
public class RFC6265CookieSpecFactory implements CookieSpecFactory {
private final ReentrantLock lock;
public enum CompatibilityLevel {
STRICT,
RELAXED,
@ -65,6 +69,7 @@ public class RFC6265CookieSpecFactory implements CookieSpecFactory {
super();
this.compatibilityLevel = compatibilityLevel != null ? compatibilityLevel : CompatibilityLevel.RELAXED;
this.publicSuffixMatcher = publicSuffixMatcher;
this.lock = new ReentrantLock();
}
public RFC6265CookieSpecFactory(final PublicSuffixMatcher publicSuffixMatcher) {
@ -78,7 +83,8 @@ public class RFC6265CookieSpecFactory implements CookieSpecFactory {
@Override
public CookieSpec create(final HttpContext context) {
if (cookieSpec == null) {
synchronized (this) {
lock.lock();
try {
if (cookieSpec == null) {
switch (this.compatibilityLevel) {
case STRICT:
@ -118,6 +124,8 @@ public class RFC6265CookieSpecFactory implements CookieSpecFactory {
LaxExpiresHandler.INSTANCE);
}
}
} finally {
lock.unlock();
}
}
return this.cookieSpec;

View File

@ -35,6 +35,7 @@ import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hc.client5.http.DnsResolver;
import org.apache.hc.client5.http.HttpRoute;
@ -103,6 +104,8 @@ public class BasicHttpClientConnectionManager implements HttpClientConnectionMan
private final HttpConnectionFactory<ManagedHttpClientConnection> connFactory;
private final String id;
private final ReentrantLock lock;
private ManagedHttpClientConnection conn;
private HttpRoute route;
private Object state;
@ -147,6 +150,7 @@ public class BasicHttpClientConnectionManager implements HttpClientConnectionMan
this.connectionConfig = ConnectionConfig.DEFAULT;
this.tlsConfig = TlsConfig.DEFAULT;
this.closed = new AtomicBoolean(false);
this.lock = new ReentrantLock();
}
public BasicHttpClientConnectionManager(
@ -184,40 +188,71 @@ public class BasicHttpClientConnectionManager implements HttpClientConnectionMan
return state;
}
public synchronized SocketConfig getSocketConfig() {
public SocketConfig getSocketConfig() {
lock.lock();
try {
return socketConfig;
} finally {
lock.unlock();
}
}
public synchronized void setSocketConfig(final SocketConfig socketConfig) {
public void setSocketConfig(final SocketConfig socketConfig) {
lock.lock();
try {
this.socketConfig = socketConfig != null ? socketConfig : SocketConfig.DEFAULT;
} finally {
lock.unlock();
}
}
/**
* @since 5.2
*/
public synchronized ConnectionConfig getConnectionConfig() {
public ConnectionConfig getConnectionConfig() {
lock.lock();
try {
return connectionConfig;
} finally {
lock.unlock();
}
}
/**
* @since 5.2
*/
public synchronized void setConnectionConfig(final ConnectionConfig connectionConfig) {
public void setConnectionConfig(final ConnectionConfig connectionConfig) {
lock.lock();
try {
this.connectionConfig = connectionConfig != null ? connectionConfig : ConnectionConfig.DEFAULT;
} finally {
lock.unlock();
}
}
/**
* @since 5.2
*/
public synchronized TlsConfig getTlsConfig() {
public TlsConfig getTlsConfig() {
lock.lock();
try {
return tlsConfig;
} finally {
lock.unlock();
}
}
/**
* @since 5.2
*/
public synchronized void setTlsConfig(final TlsConfig tlsConfig) {
public void setTlsConfig(final TlsConfig tlsConfig) {
lock.lock();
try {
this.tlsConfig = tlsConfig != null ? tlsConfig : TlsConfig.DEFAULT;
} finally {
lock.unlock();
}
}
public LeaseRequest lease(final String id, final HttpRoute route, final Object state) {
@ -246,7 +281,9 @@ public class BasicHttpClientConnectionManager implements HttpClientConnectionMan
};
}
private synchronized void closeConnection(final CloseMode closeMode) {
private void closeConnection(final CloseMode closeMode) {
lock.lock();
try {
if (this.conn != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} Closing connection {}", id, closeMode);
@ -254,6 +291,9 @@ public class BasicHttpClientConnectionManager implements HttpClientConnectionMan
this.conn.close(closeMode);
this.conn = null;
}
} finally {
lock.unlock();
}
}
private void checkExpiry() {
@ -298,7 +338,9 @@ public class BasicHttpClientConnectionManager implements HttpClientConnectionMan
}
}
synchronized ManagedHttpClientConnection getConnection(final HttpRoute route, final Object state) throws IOException {
ManagedHttpClientConnection getConnection(final HttpRoute route, final Object state) throws IOException {
lock.lock();
try {
Asserts.check(!isClosed(), "Connection manager has been shut down");
if (LOG.isDebugEnabled()) {
LOG.debug("{} Get connection for route {}", id, route);
@ -322,6 +364,9 @@ public class BasicHttpClientConnectionManager implements HttpClientConnectionMan
LOG.debug("{} Using connection {}", id, conn);
}
return this.conn;
} finally {
lock.unlock();
}
}
private InternalConnectionEndpoint cast(final ConnectionEndpoint endpoint) {
@ -332,7 +377,9 @@ public class BasicHttpClientConnectionManager implements HttpClientConnectionMan
}
@Override
public synchronized void release(final ConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
public void release(final ConnectionEndpoint endpoint, final Object state, final TimeValue keepAlive) {
lock.lock();
try {
Args.notNull(endpoint, "Managed endpoint");
final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
final ManagedHttpClientConnection conn = internalEndpoint.detach();
@ -374,10 +421,15 @@ public class BasicHttpClientConnectionManager implements HttpClientConnectionMan
} finally {
this.leased = false;
}
} finally {
lock.unlock();
}
}
@Override
public synchronized void connect(final ConnectionEndpoint endpoint, final TimeValue timeout, final HttpContext context) throws IOException {
public void connect(final ConnectionEndpoint endpoint, final TimeValue timeout, final HttpContext context) throws IOException {
lock.lock();
try {
Args.notNull(endpoint, "Endpoint");
final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
@ -411,12 +463,17 @@ public class BasicHttpClientConnectionManager implements HttpClientConnectionMan
if (socketTimeout != null) {
connection.setSocketTimeout(socketTimeout);
}
} finally {
lock.unlock();
}
}
@Override
public synchronized void upgrade(
public void upgrade(
final ConnectionEndpoint endpoint,
final HttpContext context) throws IOException {
lock.lock();
try {
Args.notNull(endpoint, "Endpoint");
Args.notNull(route, "HTTP route");
final InternalConnectionEndpoint internalEndpoint = cast(endpoint);
@ -425,18 +482,28 @@ public class BasicHttpClientConnectionManager implements HttpClientConnectionMan
internalEndpoint.getRoute().getTargetHost(),
tlsConfig,
context);
} finally {
lock.unlock();
}
}
public synchronized void closeExpired() {
public void closeExpired() {
lock.lock();
try {
if (isClosed()) {
return;
}
if (!this.leased) {
checkExpiry();
}
} finally {
lock.unlock();
}
}
public synchronized void closeIdle(final TimeValue idleTime) {
public void closeIdle(final TimeValue idleTime) {
lock.lock();
try {
Args.notNull(idleTime, "Idle time");
if (isClosed()) {
return;
@ -451,6 +518,9 @@ public class BasicHttpClientConnectionManager implements HttpClientConnectionMan
closeConnection(CloseMode.GRACEFUL);
}
}
} finally {
lock.unlock();
}
}
/**

View File

@ -33,6 +33,7 @@ import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hc.client5.http.DnsResolver;
import org.apache.hc.client5.http.HttpRoute;
@ -297,12 +298,16 @@ public class PoolingHttpClientConnectionManager
}
final Future<PoolEntry<HttpRoute, ManagedHttpClientConnection>> leaseFuture = this.pool.lease(route, state, requestTimeout, null);
return new LeaseRequest() {
// Using a ReentrantLock specific to each LeaseRequest instance to maintain the original
// synchronization semantics. This ensures that each LeaseRequest has its own unique lock.
private final ReentrantLock lock = new ReentrantLock();
private volatile ConnectionEndpoint endpoint;
@Override
public synchronized ConnectionEndpoint get(
public ConnectionEndpoint get(
final Timeout timeout) throws InterruptedException, ExecutionException, TimeoutException {
lock.lock();
try {
Args.notNull(timeout, "Operation timeout");
if (this.endpoint != null) {
return this.endpoint;
@ -367,6 +372,9 @@ public class PoolingHttpClientConnectionManager
pool.release(poolEntry, false);
throw new ExecutionException(ex.getMessage(), ex);
}
} finally {
lock.unlock();
}
}
@Override

View File

@ -35,6 +35,7 @@ import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.locks.ReentrantLock;
import org.apache.hc.core5.annotation.Contract;
import org.apache.hc.core5.annotation.ThreadingBehavior;
@ -52,6 +53,8 @@ public final class PublicSuffixMatcherLoader {
private static final Logger LOG = LoggerFactory.getLogger(PublicSuffixMatcherLoader.class);
private static final ReentrantLock lock = new ReentrantLock();
private static PublicSuffixMatcher load(final InputStream in) throws IOException {
final List<PublicSuffixList> lists = PublicSuffixListParser.INSTANCE.parseByType(
new InputStreamReader(in, StandardCharsets.UTF_8));
@ -76,7 +79,8 @@ public final class PublicSuffixMatcherLoader {
public static PublicSuffixMatcher getDefault() {
if (DEFAULT_INSTANCE == null) {
synchronized (PublicSuffixMatcherLoader.class) {
lock.lock();
try {
if (DEFAULT_INSTANCE == null){
final URL url = PublicSuffixMatcherLoader.class.getResource(
"/mozilla/public-suffix-list.txt");
@ -91,6 +95,8 @@ public final class PublicSuffixMatcherLoader {
DEFAULT_INSTANCE = new PublicSuffixMatcher(DomainType.ICANN, Collections.singletonList("com"), null);
}
}
} finally {
lock.unlock();
}
}
return DEFAULT_INSTANCE;