HBASE-18085 Prevent parallel purge in ObjectPool
This commit is contained in:
parent
ebe92c8fb3
commit
d047cc9ecc
|
@ -22,6 +22,8 @@ import java.lang.ref.Reference;
|
|||
import java.lang.ref.ReferenceQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
|
@ -52,6 +54,9 @@ public abstract class ObjectPool<K, V> {
|
|||
/** Does not permit null keys. */
|
||||
protected final ConcurrentMap<K, Reference<V>> referenceCache;
|
||||
|
||||
/** For preventing parallel purge */
|
||||
private final Lock purgeLock = new ReentrantLock();
|
||||
|
||||
/**
|
||||
* The default initial capacity,
|
||||
* used when not otherwise specified in a constructor.
|
||||
|
@ -117,12 +122,29 @@ public abstract class ObjectPool<K, V> {
|
|||
}
|
||||
|
||||
/**
|
||||
* Removes stale references of shared objects from the pool.
|
||||
* References newly becoming stale may still remain.
|
||||
* The implementation of this method is expected to be lightweight
|
||||
* when there is no stale reference.
|
||||
* Removes stale references of shared objects from the pool. References newly becoming stale may
|
||||
* still remain.
|
||||
* <p/>
|
||||
* The implementation of this method is expected to be lightweight when there is no stale
|
||||
* reference with the Oracle (Sun) implementation of {@code ReferenceQueue}, because
|
||||
* {@code ReferenceQueue.poll} just checks a volatile instance variable in {@code ReferenceQueue}.
|
||||
*/
|
||||
public abstract void purge();
|
||||
public void purge() {
|
||||
if (purgeLock.tryLock()) {// no parallel purge
|
||||
try {
|
||||
while (true) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Reference<V> ref = (Reference<V>) staleRefQueue.poll();
|
||||
if (ref == null) {
|
||||
break;
|
||||
}
|
||||
referenceCache.remove(getReferenceKey(ref), ref);
|
||||
}
|
||||
} finally {
|
||||
purgeLock.unlock();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a reference associated with the given object
|
||||
|
@ -132,6 +154,13 @@ public abstract class ObjectPool<K, V> {
|
|||
*/
|
||||
public abstract Reference<V> createReference(K key, V obj);
|
||||
|
||||
/**
|
||||
* Get key of the given reference
|
||||
* @param ref The reference
|
||||
* @return key of the reference
|
||||
*/
|
||||
public abstract K getReferenceKey(Reference<V> ref);
|
||||
|
||||
/**
|
||||
* Returns a shared object associated with the given {@code key},
|
||||
* which is identified by the {@code equals} method.
|
||||
|
|
|
@ -48,22 +48,6 @@ public class SoftObjectPool<K, V> extends ObjectPool<K, V> {
|
|||
super(objectFactory, initialCapacity, concurrencyLevel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void purge() {
|
||||
// This method is lightweight while there is no stale reference
|
||||
// with the Oracle (Sun) implementation of {@code ReferenceQueue},
|
||||
// because {@code ReferenceQueue.poll} just checks a volatile instance
|
||||
// variable in {@code ReferenceQueue}.
|
||||
while (true) {
|
||||
@SuppressWarnings("unchecked")
|
||||
SoftObjectReference ref = (SoftObjectReference) staleRefQueue.poll();
|
||||
if (ref == null) {
|
||||
break;
|
||||
}
|
||||
referenceCache.remove(ref.key, ref);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Reference<V> createReference(K key, V obj) {
|
||||
return new SoftObjectReference(key, obj);
|
||||
|
@ -78,4 +62,9 @@ public class SoftObjectPool<K, V> extends ObjectPool<K, V> {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public K getReferenceKey(Reference<V> ref) {
|
||||
return ((SoftObjectReference) ref).key;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
@ -48,22 +48,6 @@ public class WeakObjectPool<K,V> extends ObjectPool<K,V> {
|
|||
super(objectFactory, initialCapacity, concurrencyLevel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void purge() {
|
||||
// This method is lightweight while there is no stale reference
|
||||
// with the Oracle (Sun) implementation of {@code ReferenceQueue},
|
||||
// because {@code ReferenceQueue.poll} just checks a volatile instance
|
||||
// variable in {@code ReferenceQueue}.
|
||||
while (true) {
|
||||
@SuppressWarnings("unchecked")
|
||||
WeakObjectReference ref = (WeakObjectReference) staleRefQueue.poll();
|
||||
if (ref == null) {
|
||||
break;
|
||||
}
|
||||
referenceCache.remove(ref.key, ref);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Reference<V> createReference(K key, V obj) {
|
||||
return new WeakObjectReference(key, obj);
|
||||
|
@ -78,4 +62,9 @@ public class WeakObjectPool<K,V> extends ObjectPool<K,V> {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public K getReferenceKey(Reference<V> ref) {
|
||||
return ((WeakObjectReference)ref).key;
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue