Fix handling interruptedException in resource pool (#9044)

This commit is contained in:
Jihoon Son 2019-12-16 09:41:13 -08:00 committed by GitHub
parent bc16ff5e7c
commit 298425a33a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 34 additions and 16 deletions

View File

@ -28,6 +28,7 @@ import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.io.Closer; import org.apache.druid.java.util.common.io.Closer;
import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.logger.Logger;
import javax.annotation.Nullable;
import java.io.Closeable; import java.io.Closeable;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayDeque; import java.util.ArrayDeque;
@ -38,6 +39,16 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
/** /**
* A resource pool based on {@link LoadingCache}. When a resource is first requested for a new key,
* all {@link ResourcePoolConfig#getMaxPerKey()} resources are initialized and cached in the {@link #pool}.
* The individual resource in {@link ImmediateCreationResourceHolder} is valid while (current time - last access time)
* <= {@link ResourcePoolConfig#getUnusedConnectionTimeoutMillis()}.
*
* A resource is closed and reinitialized if {@link ResourceFactory#isGood} returns false or it's expired based on
* {@link ResourcePoolConfig#getUnusedConnectionTimeoutMillis()}.
*
* {@link ResourcePoolConfig#getMaxPerKey() is a hard limit for the max number of resources per cache entry. The total
* number of resources in {@link ImmediateCreationResourceHolder} cannot be larger than the limit in any case.
*/ */
public class ResourcePool<K, V> implements Closeable public class ResourcePool<K, V> implements Closeable
{ {
@ -45,10 +56,7 @@ public class ResourcePool<K, V> implements Closeable
private final LoadingCache<K, ImmediateCreationResourceHolder<K, V>> pool; private final LoadingCache<K, ImmediateCreationResourceHolder<K, V>> pool;
private final AtomicBoolean closed = new AtomicBoolean(false); private final AtomicBoolean closed = new AtomicBoolean(false);
public ResourcePool( public ResourcePool(final ResourceFactory<K, V> factory, final ResourcePoolConfig config)
final ResourceFactory<K, V> factory,
final ResourcePoolConfig config
)
{ {
this.pool = CacheBuilder.newBuilder().build( this.pool = CacheBuilder.newBuilder().build(
new CacheLoader<K, ImmediateCreationResourceHolder<K, V>>() new CacheLoader<K, ImmediateCreationResourceHolder<K, V>>()
@ -56,7 +64,7 @@ public class ResourcePool<K, V> implements Closeable
@Override @Override
public ImmediateCreationResourceHolder<K, V> load(K input) public ImmediateCreationResourceHolder<K, V> load(K input)
{ {
return new ImmediateCreationResourceHolder<K, V>( return new ImmediateCreationResourceHolder<>(
config.getMaxPerKey(), config.getMaxPerKey(),
config.getUnusedConnectionTimeoutMillis(), config.getUnusedConnectionTimeoutMillis(),
input, input,
@ -67,6 +75,10 @@ public class ResourcePool<K, V> implements Closeable
); );
} }
/**
* Returns a {@link ResourceContainer} for the given key or null if this pool is already closed.
*/
@Nullable
public ResourceContainer<V> take(final K key) public ResourceContainer<V> take(final K key)
{ {
if (closed.get()) { if (closed.get()) {
@ -166,16 +178,22 @@ public class ResourcePool<K, V> implements Closeable
this.resourceHolderList = new ArrayDeque<>(); this.resourceHolderList = new ArrayDeque<>();
for (int i = 0; i < maxSize; ++i) { for (int i = 0; i < maxSize; ++i) {
resourceHolderList.add(new ResourceHolder<>( resourceHolderList.add(
new ResourceHolder<>(
System.currentTimeMillis(), System.currentTimeMillis(),
Preconditions.checkNotNull( Preconditions.checkNotNull(
factory.generate(key), factory.generate(key),
"factory.generate(key)" "factory.generate(key)"
) )
)); )
);
} }
} }
/**
* Returns a resource or null if this holder is already closed or the current thread is interrupted.
*/
@Nullable
V get() V get()
{ {
// resourceHolderList can't have nulls, so we'll use a null to signal that we need to create a new resource. // resourceHolderList can't have nulls, so we'll use a null to signal that we need to create a new resource.
@ -186,7 +204,7 @@ public class ResourcePool<K, V> implements Closeable
this.wait(); this.wait();
} }
catch (InterruptedException e) { catch (InterruptedException e) {
Thread.interrupted(); Thread.currentThread().interrupt();
return null; return null;
} }
} }
@ -293,16 +311,16 @@ public class ResourcePool<K, V> implements Closeable
private static class ResourceHolder<V> private static class ResourceHolder<V>
{ {
private long lastAccessedTime; private final long lastAccessedTime;
private V resource; private final V resource;
public ResourceHolder(long lastAccessedTime, V resource) private ResourceHolder(long lastAccessedTime, V resource)
{ {
this.resource = resource; this.resource = resource;
this.lastAccessedTime = lastAccessedTime; this.lastAccessedTime = lastAccessedTime;
} }
public long getLastAccessedTime() private long getLastAccessedTime()
{ {
return lastAccessedTime; return lastAccessedTime;
} }