diff --git a/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java b/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java index d4354d82e46..40570f060e6 100644 --- a/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java +++ b/core/src/main/java/org/apache/druid/java/util/http/client/pool/ResourcePool.java @@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.StringUtils; import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.logger.Logger; +import javax.annotation.Nullable; import java.io.Closeable; import java.io.IOException; import java.util.ArrayDeque; @@ -38,6 +39,16 @@ import java.util.concurrent.ExecutionException; import java.util.concurrent.atomic.AtomicBoolean; /** + * A resource pool based on {@link LoadingCache}. When a resource is first requested for a new key, + * all {@link ResourcePoolConfig#getMaxPerKey()} resources are initialized and cached in the {@link #pool}. + * The individual resource in {@link ImmediateCreationResourceHolder} is valid while (current time - last access time) + * <= {@link ResourcePoolConfig#getUnusedConnectionTimeoutMillis()}. + * + * A resource is closed and reinitialized if {@link ResourceFactory#isGood} returns false or it's expired based on + * {@link ResourcePoolConfig#getUnusedConnectionTimeoutMillis()}. + * + * {@link ResourcePoolConfig#getMaxPerKey() is a hard limit for the max number of resources per cache entry. The total + * number of resources in {@link ImmediateCreationResourceHolder} cannot be larger than the limit in any case. */ public class ResourcePool implements Closeable { @@ -45,10 +56,7 @@ public class ResourcePool implements Closeable private final LoadingCache> pool; private final AtomicBoolean closed = new AtomicBoolean(false); - public ResourcePool( - final ResourceFactory factory, - final ResourcePoolConfig config - ) + public ResourcePool(final ResourceFactory factory, final ResourcePoolConfig config) { this.pool = CacheBuilder.newBuilder().build( new CacheLoader>() @@ -56,7 +64,7 @@ public class ResourcePool implements Closeable @Override public ImmediateCreationResourceHolder load(K input) { - return new ImmediateCreationResourceHolder( + return new ImmediateCreationResourceHolder<>( config.getMaxPerKey(), config.getUnusedConnectionTimeoutMillis(), input, @@ -67,6 +75,10 @@ public class ResourcePool implements Closeable ); } + /** + * Returns a {@link ResourceContainer} for the given key or null if this pool is already closed. + */ + @Nullable public ResourceContainer take(final K key) { if (closed.get()) { @@ -166,16 +178,22 @@ public class ResourcePool implements Closeable this.resourceHolderList = new ArrayDeque<>(); for (int i = 0; i < maxSize; ++i) { - resourceHolderList.add(new ResourceHolder<>( - System.currentTimeMillis(), - Preconditions.checkNotNull( - factory.generate(key), - "factory.generate(key)" + resourceHolderList.add( + new ResourceHolder<>( + System.currentTimeMillis(), + Preconditions.checkNotNull( + factory.generate(key), + "factory.generate(key)" + ) ) - )); + ); } } + /** + * Returns a resource or null if this holder is already closed or the current thread is interrupted. + */ + @Nullable V get() { // resourceHolderList can't have nulls, so we'll use a null to signal that we need to create a new resource. @@ -186,7 +204,7 @@ public class ResourcePool implements Closeable this.wait(); } catch (InterruptedException e) { - Thread.interrupted(); + Thread.currentThread().interrupt(); return null; } } @@ -293,16 +311,16 @@ public class ResourcePool implements Closeable private static class ResourceHolder { - private long lastAccessedTime; - private V resource; + private final long lastAccessedTime; + private final V resource; - public ResourceHolder(long lastAccessedTime, V resource) + private ResourceHolder(long lastAccessedTime, V resource) { this.resource = resource; this.lastAccessedTime = lastAccessedTime; } - public long getLastAccessedTime() + private long getLastAccessedTime() { return lastAccessedTime; }