HBASE-17747 Support both weak and soft object pool
This commit is contained in:
parent
201c838250
commit
44b255889c
|
@ -50,7 +50,7 @@ public class KeyLocker<K> {
|
|||
|
||||
private final WeakObjectPool<K, ReentrantLock> lockPool =
|
||||
new WeakObjectPool<>(
|
||||
new WeakObjectPool.ObjectFactory<K, ReentrantLock>() {
|
||||
new ObjectPool.ObjectFactory<K, ReentrantLock>() {
|
||||
@Override
|
||||
public ReentrantLock createObject(K key) {
|
||||
return new ReentrantLock();
|
||||
|
|
|
@ -0,0 +1,174 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.lang.ref.Reference;
|
||||
import java.lang.ref.ReferenceQueue;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
|
||||
/**
|
||||
* A thread-safe shared object pool in which object creation is expected to be lightweight, and the
|
||||
* objects may be excessively created and discarded.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public abstract class ObjectPool<K, V> {
|
||||
/**
|
||||
* An {@code ObjectFactory} object is used to create
|
||||
* new shared objects on demand.
|
||||
*/
|
||||
public interface ObjectFactory<K, V> {
|
||||
/**
|
||||
* Creates a new shared object associated with the given {@code key},
|
||||
* identified by the {@code equals} method.
|
||||
* This method may be simultaneously called by multiple threads
|
||||
* with the same key, and the excessive objects are just discarded.
|
||||
*/
|
||||
V createObject(K key);
|
||||
}
|
||||
|
||||
protected final ReferenceQueue<V> staleRefQueue = new ReferenceQueue<>();
|
||||
|
||||
private final ObjectFactory<K, V> objectFactory;
|
||||
|
||||
/** Does not permit null keys. */
|
||||
protected final ConcurrentMap<K, Reference<V>> referenceCache;
|
||||
|
||||
/**
|
||||
* The default initial capacity,
|
||||
* used when not otherwise specified in a constructor.
|
||||
*/
|
||||
public static final int DEFAULT_INITIAL_CAPACITY = 16;
|
||||
|
||||
/**
|
||||
* The default concurrency level,
|
||||
* used when not otherwise specified in a constructor.
|
||||
*/
|
||||
public static final int DEFAULT_CONCURRENCY_LEVEL = 16;
|
||||
|
||||
/**
|
||||
* Creates a new pool with the default initial capacity (16)
|
||||
* and the default concurrency level (16).
|
||||
*
|
||||
* @param objectFactory the factory to supply new objects on demand
|
||||
*
|
||||
* @throws NullPointerException if {@code objectFactory} is null
|
||||
*/
|
||||
public ObjectPool(ObjectFactory<K, V> objectFactory) {
|
||||
this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new pool with the given initial capacity
|
||||
* and the default concurrency level (16).
|
||||
*
|
||||
* @param objectFactory the factory to supply new objects on demand
|
||||
* @param initialCapacity the initial capacity to keep objects in the pool
|
||||
*
|
||||
* @throws NullPointerException if {@code objectFactory} is null
|
||||
* @throws IllegalArgumentException if {@code initialCapacity} is negative
|
||||
*/
|
||||
public ObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) {
|
||||
this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new pool with the given initial capacity
|
||||
* and the given concurrency level.
|
||||
*
|
||||
* @param objectFactory the factory to supply new objects on demand
|
||||
* @param initialCapacity the initial capacity to keep objects in the pool
|
||||
* @param concurrencyLevel the estimated count of concurrently accessing threads
|
||||
*
|
||||
* @throws NullPointerException if {@code objectFactory} is null
|
||||
* @throws IllegalArgumentException if {@code initialCapacity} is negative or
|
||||
* {@code concurrencyLevel} is non-positive
|
||||
*/
|
||||
public ObjectPool(
|
||||
ObjectFactory<K, V> objectFactory,
|
||||
int initialCapacity,
|
||||
int concurrencyLevel) {
|
||||
|
||||
if (objectFactory == null) {
|
||||
throw new NullPointerException("Given object factory instance is NULL");
|
||||
}
|
||||
this.objectFactory = objectFactory;
|
||||
|
||||
this.referenceCache =
|
||||
new ConcurrentHashMap<K, Reference<V>>(initialCapacity, 0.75f, concurrencyLevel);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes stale references of shared objects from the pool.
|
||||
* References newly becoming stale may still remain.
|
||||
* The implementation of this method is expected to be lightweight
|
||||
* when there is no stale reference.
|
||||
*/
|
||||
public abstract void purge();
|
||||
|
||||
/**
|
||||
* Create a reference associated with the given object
|
||||
* @param key the key to store in the reference
|
||||
* @param obj the object to associate with
|
||||
* @return the reference instance
|
||||
*/
|
||||
public abstract Reference<V> createReference(K key, V obj);
|
||||
|
||||
/**
|
||||
* Returns a shared object associated with the given {@code key},
|
||||
* which is identified by the {@code equals} method.
|
||||
* @throws NullPointerException if {@code key} is null
|
||||
*/
|
||||
public V get(K key) {
|
||||
Reference<V> ref = referenceCache.get(key);
|
||||
if (ref != null) {
|
||||
V obj = ref.get();
|
||||
if (obj != null) {
|
||||
return obj;
|
||||
}
|
||||
referenceCache.remove(key, ref);
|
||||
}
|
||||
|
||||
V newObj = objectFactory.createObject(key);
|
||||
Reference<V> newRef = createReference(key, newObj);
|
||||
while (true) {
|
||||
Reference<V> existingRef = referenceCache.putIfAbsent(key, newRef);
|
||||
if (existingRef == null) {
|
||||
return newObj;
|
||||
}
|
||||
|
||||
V existingObject = existingRef.get();
|
||||
if (existingObject != null) {
|
||||
return existingObject;
|
||||
}
|
||||
referenceCache.remove(key, existingRef);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an estimated count of objects kept in the pool.
|
||||
* This also counts stale references,
|
||||
* and you might want to call {@link #purge()} beforehand.
|
||||
*/
|
||||
public int size() {
|
||||
return referenceCache.size();
|
||||
}
|
||||
}
|
|
@ -0,0 +1,81 @@
|
|||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.lang.ref.Reference;
|
||||
import java.lang.ref.SoftReference;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.ObjectPool.ObjectFactory;
|
||||
|
||||
/**
|
||||
* A {@code SoftReference} based shared object pool.
|
||||
* The objects are kept in soft references and
|
||||
* associated with keys which are identified by the {@code equals} method.
|
||||
* The objects are created by {@link ObjectFactory} on demand.
|
||||
* The object creation is expected to be lightweight,
|
||||
* and the objects may be excessively created and discarded.
|
||||
* Thread safe.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class SoftObjectPool<K, V> extends ObjectPool<K, V> {
|
||||
|
||||
public SoftObjectPool(ObjectFactory<K, V> objectFactory) {
|
||||
super(objectFactory);
|
||||
}
|
||||
|
||||
public SoftObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) {
|
||||
super(objectFactory, initialCapacity);
|
||||
}
|
||||
|
||||
public SoftObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity,
|
||||
int concurrencyLevel) {
|
||||
super(objectFactory, initialCapacity, concurrencyLevel);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void purge() {
|
||||
// This method is lightweight while there is no stale reference
|
||||
// with the Oracle (Sun) implementation of {@code ReferenceQueue},
|
||||
// because {@code ReferenceQueue.poll} just checks a volatile instance
|
||||
// variable in {@code ReferenceQueue}.
|
||||
while (true) {
|
||||
@SuppressWarnings("unchecked")
|
||||
SoftObjectReference ref = (SoftObjectReference) staleRefQueue.poll();
|
||||
if (ref == null) {
|
||||
break;
|
||||
}
|
||||
referenceCache.remove(ref.key, ref);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Reference<V> createReference(K key, V obj) {
|
||||
return new SoftObjectReference(key, obj);
|
||||
}
|
||||
|
||||
private class SoftObjectReference extends SoftReference<V> {
|
||||
final K key;
|
||||
|
||||
SoftObjectReference(K key, V obj) {
|
||||
super(obj, staleRefQueue);
|
||||
this.key = key;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
|
@ -1,4 +1,4 @@
|
|||
/*
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
|
@ -15,15 +15,13 @@
|
|||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.hbase.util;
|
||||
|
||||
import java.lang.ref.ReferenceQueue;
|
||||
import java.lang.ref.Reference;
|
||||
import java.lang.ref.WeakReference;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.ConcurrentMap;
|
||||
|
||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.hbase.util.ObjectPool.ObjectFactory;
|
||||
|
||||
/**
|
||||
* A {@code WeakReference} based shared object pool.
|
||||
|
@ -35,116 +33,30 @@ import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
|||
* Thread safe.
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class WeakObjectPool<K, V> {
|
||||
/**
|
||||
* An {@code ObjectFactory} object is used to create
|
||||
* new shared objects on demand.
|
||||
*/
|
||||
public interface ObjectFactory<K, V> {
|
||||
/**
|
||||
* Creates a new shared object associated with the given {@code key},
|
||||
* identified by the {@code equals} method.
|
||||
* This method may be simultaneously called by multiple threads
|
||||
* with the same key, and the excessive objects are just discarded.
|
||||
*/
|
||||
V createObject(K key);
|
||||
}
|
||||
public class WeakObjectPool<K,V> extends ObjectPool<K,V> {
|
||||
|
||||
private final ReferenceQueue<V> staleRefQueue = new ReferenceQueue<>();
|
||||
|
||||
private class ObjectReference extends WeakReference<V> {
|
||||
final K key;
|
||||
|
||||
ObjectReference(K key, V obj) {
|
||||
super(obj, staleRefQueue);
|
||||
this.key = key;
|
||||
}
|
||||
}
|
||||
|
||||
private final ObjectFactory<K, V> objectFactory;
|
||||
|
||||
/** Does not permit null keys. */
|
||||
private final ConcurrentMap<K, ObjectReference> referenceCache;
|
||||
|
||||
/**
|
||||
* The default initial capacity,
|
||||
* used when not otherwise specified in a constructor.
|
||||
*/
|
||||
public static final int DEFAULT_INITIAL_CAPACITY = 16;
|
||||
|
||||
/**
|
||||
* The default concurrency level,
|
||||
* used when not otherwise specified in a constructor.
|
||||
*/
|
||||
public static final int DEFAULT_CONCURRENCY_LEVEL = 16;
|
||||
|
||||
/**
|
||||
* Creates a new pool with the default initial capacity (16)
|
||||
* and the default concurrency level (16).
|
||||
*
|
||||
* @param objectFactory the factory to supply new objects on demand
|
||||
*
|
||||
* @throws NullPointerException if {@code objectFactory} is null
|
||||
*/
|
||||
public WeakObjectPool(ObjectFactory<K, V> objectFactory) {
|
||||
this(objectFactory, DEFAULT_INITIAL_CAPACITY, DEFAULT_CONCURRENCY_LEVEL);
|
||||
super(objectFactory);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new pool with the given initial capacity
|
||||
* and the default concurrency level (16).
|
||||
*
|
||||
* @param objectFactory the factory to supply new objects on demand
|
||||
* @param initialCapacity the initial capacity to keep objects in the pool
|
||||
*
|
||||
* @throws NullPointerException if {@code objectFactory} is null
|
||||
* @throws IllegalArgumentException if {@code initialCapacity} is negative
|
||||
*/
|
||||
public WeakObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity) {
|
||||
this(objectFactory, initialCapacity, DEFAULT_CONCURRENCY_LEVEL);
|
||||
super(objectFactory, initialCapacity);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new pool with the given initial capacity
|
||||
* and the given concurrency level.
|
||||
*
|
||||
* @param objectFactory the factory to supply new objects on demand
|
||||
* @param initialCapacity the initial capacity to keep objects in the pool
|
||||
* @param concurrencyLevel the estimated count of concurrently accessing threads
|
||||
*
|
||||
* @throws NullPointerException if {@code objectFactory} is null
|
||||
* @throws IllegalArgumentException if {@code initialCapacity} is negative or
|
||||
* {@code concurrencyLevel} is non-positive
|
||||
*/
|
||||
public WeakObjectPool(
|
||||
ObjectFactory<K, V> objectFactory,
|
||||
int initialCapacity,
|
||||
public WeakObjectPool(ObjectFactory<K, V> objectFactory, int initialCapacity,
|
||||
int concurrencyLevel) {
|
||||
|
||||
if (objectFactory == null) {
|
||||
throw new NullPointerException();
|
||||
}
|
||||
this.objectFactory = objectFactory;
|
||||
|
||||
this.referenceCache = new ConcurrentHashMap<>(initialCapacity, 0.75f, concurrencyLevel);
|
||||
// 0.75f is the default load factor threshold of ConcurrentHashMap.
|
||||
super(objectFactory, initialCapacity, concurrencyLevel);
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes stale references of shared objects from the pool.
|
||||
* References newly becoming stale may still remain.
|
||||
* The implementation of this method is expected to be lightweight
|
||||
* when there is no stale reference.
|
||||
*/
|
||||
@Override
|
||||
public void purge() {
|
||||
// This method is lightweight while there is no stale reference
|
||||
// with the Oracle (Sun) implementation of {@code ReferenceQueue},
|
||||
// because {@code ReferenceQueue.poll} just checks a volatile instance
|
||||
// variable in {@code ReferenceQueue}.
|
||||
|
||||
while (true) {
|
||||
@SuppressWarnings("unchecked")
|
||||
ObjectReference ref = (ObjectReference)staleRefQueue.poll();
|
||||
WeakObjectReference ref = (WeakObjectReference) staleRefQueue.poll();
|
||||
if (ref == null) {
|
||||
break;
|
||||
}
|
||||
|
@ -152,43 +64,18 @@ public class WeakObjectPool<K, V> {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a shared object associated with the given {@code key},
|
||||
* which is identified by the {@code equals} method.
|
||||
* @throws NullPointerException if {@code key} is null
|
||||
*/
|
||||
public V get(K key) {
|
||||
ObjectReference ref = referenceCache.get(key);
|
||||
if (ref != null) {
|
||||
V obj = ref.get();
|
||||
if (obj != null) {
|
||||
return obj;
|
||||
}
|
||||
referenceCache.remove(key, ref);
|
||||
}
|
||||
@Override
|
||||
public Reference<V> createReference(K key, V obj) {
|
||||
return new WeakObjectReference(key, obj);
|
||||
}
|
||||
|
||||
V newObj = objectFactory.createObject(key);
|
||||
ObjectReference newRef = new ObjectReference(key, newObj);
|
||||
while (true) {
|
||||
ObjectReference existingRef = referenceCache.putIfAbsent(key, newRef);
|
||||
if (existingRef == null) {
|
||||
return newObj;
|
||||
}
|
||||
private class WeakObjectReference extends WeakReference<V> {
|
||||
final K key;
|
||||
|
||||
V existingObject = existingRef.get();
|
||||
if (existingObject != null) {
|
||||
return existingObject;
|
||||
}
|
||||
referenceCache.remove(key, existingRef);
|
||||
WeakObjectReference(K key, V obj) {
|
||||
super(obj, staleRefQueue);
|
||||
this.key = key;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an estimated count of objects kept in the pool.
|
||||
* This also counts stale references,
|
||||
* and you might want to call {@link #purge()} beforehand.
|
||||
*/
|
||||
public int size() {
|
||||
return referenceCache.size();
|
||||
}
|
||||
}
|
||||
|
|
|
@ -31,12 +31,12 @@ import org.junit.experimental.categories.Category;
|
|||
|
||||
@Category({MiscTests.class, SmallTests.class})
|
||||
public class TestWeakObjectPool {
|
||||
WeakObjectPool<String, Object> pool;
|
||||
ObjectPool<String, Object> pool;
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
pool = new WeakObjectPool<>(
|
||||
new WeakObjectPool.ObjectFactory<String, Object>() {
|
||||
new ObjectPool.ObjectFactory<String, Object>() {
|
||||
@Override
|
||||
public Object createObject(String key) {
|
||||
return new Object();
|
||||
|
|
|
@ -44,10 +44,11 @@ import com.google.common.annotations.VisibleForTesting;
|
|||
public class IdReadWriteLock {
|
||||
// The number of lock we want to easily support. It's not a maximum.
|
||||
private static final int NB_CONCURRENT_LOCKS = 1000;
|
||||
// The pool to get entry from, entries are mapped by weak reference to make it able to be
|
||||
// garbage-collected asap
|
||||
private final WeakObjectPool<Long, ReentrantReadWriteLock> lockPool = new WeakObjectPool<>(
|
||||
new WeakObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() {
|
||||
// The pool to get entry from, entries are mapped by soft reference and will be
|
||||
// automatically garbage-collected when JVM memory pressure is high
|
||||
private final ObjectPool<Long, ReentrantReadWriteLock> lockPool =
|
||||
new SoftObjectPool<>(
|
||||
new ObjectPool.ObjectFactory<Long, ReentrantReadWriteLock>() {
|
||||
@Override
|
||||
public ReentrantReadWriteLock createObject(Long id) {
|
||||
return new ReentrantReadWriteLock();
|
||||
|
|
|
@ -111,10 +111,11 @@ public class TestIdReadWriteLock {
|
|||
Future<Boolean> result = ecs.take();
|
||||
assertTrue(result.get());
|
||||
}
|
||||
// make sure the entry pool will be cleared after GC and purge call
|
||||
// make sure the entry pool won't be cleared when JVM memory is enough
|
||||
// even after GC and purge call
|
||||
int entryPoolSize = idLock.purgeAndGetEntryPoolSize();
|
||||
LOG.debug("Size of entry pool after gc and purge: " + entryPoolSize);
|
||||
assertEquals(0, entryPoolSize);
|
||||
assertEquals(NUM_IDS, entryPoolSize);
|
||||
} finally {
|
||||
exec.shutdown();
|
||||
exec.awaitTermination(5000, TimeUnit.MILLISECONDS);
|
||||
|
|
Loading…
Reference in New Issue