HBASE-14268 Improve KeyLocker (Hiroshi Ikeda)
This commit is contained in:
parent
a45cb72ef2
commit
99e99f3b54
|
@ -18,49 +18,45 @@
|
||||||
|
|
||||||
package org.apache.hadoop.hbase.util;
|
package org.apache.hadoop.hbase.util;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.HashMap;
|
import java.util.LinkedHashMap;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.SortedSet;
|
|
||||||
import java.util.TreeSet;
|
|
||||||
import java.util.concurrent.atomic.AtomicInteger;
|
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
import org.apache.hadoop.hbase.classification.InterfaceAudience;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A utility class to manage a set of locks. Each lock is identified by a String which serves
|
* A utility class to manage a set of locks. Each lock is identified by a String which serves
|
||||||
* as a key. Typical usage is: <p>
|
* as a key. Typical usage is: <pre>
|
||||||
* class Example{
|
* class Example {
|
||||||
* private final static KeyLocker<String> locker = new Locker<String>();
|
* private final static KeyLocker<String> locker = new Locker<String>();
|
||||||
* </p>
|
* public void foo(String s){
|
||||||
* <p>
|
* Lock lock = locker.acquireLock(s);
|
||||||
* public void foo(String s){
|
* try {
|
||||||
* Lock lock = locker.acquireLock(s);
|
* // whatever
|
||||||
* try {
|
* }finally{
|
||||||
* // whatever
|
* lock.unlock();
|
||||||
* }finally{
|
* }
|
||||||
* lock.unlock();
|
* }
|
||||||
* }
|
* }
|
||||||
* }
|
* </pre>
|
||||||
* }
|
|
||||||
* </p>
|
|
||||||
*/
|
*/
|
||||||
@InterfaceAudience.Private
|
@InterfaceAudience.Private
|
||||||
public class KeyLocker<K extends Comparable<? super K>> {
|
public class KeyLocker<K> {
|
||||||
private static final Log LOG = LogFactory.getLog(KeyLocker.class);
|
|
||||||
|
|
||||||
// The number of lock we want to easily support. It's not a maximum.
|
// The number of lock we want to easily support. It's not a maximum.
|
||||||
private static final int NB_CONCURRENT_LOCKS = 1000;
|
private static final int NB_CONCURRENT_LOCKS = 1000;
|
||||||
|
|
||||||
// We need an atomic counter to manage the number of users using the lock and free it when
|
private final WeakObjectPool<K, ReentrantLock> lockPool =
|
||||||
// it's equal to zero.
|
new WeakObjectPool<K, ReentrantLock>(
|
||||||
private final Map<K, Pair<KeyLock<K>, AtomicInteger>> locks =
|
new WeakObjectPool.ObjectFactory<K, ReentrantLock>() {
|
||||||
new HashMap<K, Pair<KeyLock<K>, AtomicInteger>>(NB_CONCURRENT_LOCKS);
|
@Override
|
||||||
|
public ReentrantLock createObject(K key) {
|
||||||
|
return new ReentrantLock();
|
||||||
|
}
|
||||||
|
},
|
||||||
|
NB_CONCURRENT_LOCKS);
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return a lock for the given key. The lock is already locked.
|
* Return a lock for the given key. The lock is already locked.
|
||||||
|
@ -70,67 +66,36 @@ public class KeyLocker<K extends Comparable<? super K>> {
|
||||||
public ReentrantLock acquireLock(K key) {
|
public ReentrantLock acquireLock(K key) {
|
||||||
if (key == null) throw new IllegalArgumentException("key must not be null");
|
if (key == null) throw new IllegalArgumentException("key must not be null");
|
||||||
|
|
||||||
Pair<KeyLock<K>, AtomicInteger> lock;
|
lockPool.purge();
|
||||||
synchronized (this) {
|
ReentrantLock lock = lockPool.get(key);
|
||||||
lock = locks.get(key);
|
|
||||||
if (lock == null) {
|
lock.lock();
|
||||||
lock = new Pair<KeyLock<K>, AtomicInteger>(
|
return lock;
|
||||||
new KeyLock<K>(this, key), new AtomicInteger(1));
|
|
||||||
locks.put(key, lock);
|
|
||||||
} else {
|
|
||||||
lock.getSecond().incrementAndGet();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
lock.getFirst().lock();
|
|
||||||
return lock.getFirst();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Acquire locks for a set of keys. The keys will be
|
* Acquire locks for a set of keys. The keys will be
|
||||||
* sorted internally to avoid possible deadlock.
|
* sorted internally to avoid possible deadlock.
|
||||||
|
*
|
||||||
|
* @throw ClassCastException if the given {@code keys}
|
||||||
|
* contains elements that are not mutually comparable
|
||||||
*/
|
*/
|
||||||
public Map<K, Lock> acquireLocks(final Set<K> keys) {
|
public Map<K, Lock> acquireLocks(Set<? extends K> keys) {
|
||||||
Map<K, Lock> locks = new HashMap<K, Lock>(keys.size());
|
Object[] keyArray = keys.toArray();
|
||||||
SortedSet<K> sortedKeys = new TreeSet<K>(keys);
|
Arrays.sort(keyArray);
|
||||||
for (K key : sortedKeys) {
|
|
||||||
locks.put(key, acquireLock(key));
|
lockPool.purge();
|
||||||
|
Map<K, Lock> locks = new LinkedHashMap<K, Lock>(keyArray.length);
|
||||||
|
for (Object o : keyArray) {
|
||||||
|
@SuppressWarnings("unchecked")
|
||||||
|
K key = (K)o;
|
||||||
|
ReentrantLock lock = lockPool.get(key);
|
||||||
|
locks.put(key, lock);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (Lock lock : locks.values()) {
|
||||||
|
lock.lock();
|
||||||
}
|
}
|
||||||
return locks;
|
return locks;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Free the lock for the given key.
|
|
||||||
*/
|
|
||||||
private synchronized void releaseLock(K key) {
|
|
||||||
Pair<KeyLock<K>, AtomicInteger> lock = locks.get(key);
|
|
||||||
if (lock != null) {
|
|
||||||
if (lock.getSecond().decrementAndGet() == 0) {
|
|
||||||
locks.remove(key);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
String message = "Can't release the lock for " + key+", this key is not in the key list." +
|
|
||||||
" known keys are: "+ locks.keySet();
|
|
||||||
LOG.error(message);
|
|
||||||
throw new RuntimeException(message);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
static class KeyLock<K extends Comparable<? super K>> extends ReentrantLock {
|
|
||||||
private static final long serialVersionUID = -12432857283423584L;
|
|
||||||
|
|
||||||
private final transient KeyLocker<K> locker;
|
|
||||||
private final K lockId;
|
|
||||||
|
|
||||||
private KeyLock(KeyLocker<K> locker, K lockId) {
|
|
||||||
super();
|
|
||||||
this.locker = locker;
|
|
||||||
this.lockId = lockId;
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public void unlock() {
|
|
||||||
super.unlock();
|
|
||||||
locker.releaseLock(lockId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -30,7 +30,7 @@ import org.junit.experimental.categories.Category;
|
||||||
public class TestKeyLocker {
|
public class TestKeyLocker {
|
||||||
@Test
|
@Test
|
||||||
public void testLocker(){
|
public void testLocker(){
|
||||||
KeyLocker<String> locker = new KeyLocker();
|
KeyLocker<String> locker = new KeyLocker<String>();
|
||||||
ReentrantLock lock1 = locker.acquireLock("l1");
|
ReentrantLock lock1 = locker.acquireLock("l1");
|
||||||
Assert.assertTrue(lock1.isHeldByCurrentThread());
|
Assert.assertTrue(lock1.isHeldByCurrentThread());
|
||||||
|
|
||||||
|
@ -51,9 +51,19 @@ public class TestKeyLocker {
|
||||||
lock2.unlock();
|
lock2.unlock();
|
||||||
Assert.assertFalse(lock20.isHeldByCurrentThread());
|
Assert.assertFalse(lock20.isHeldByCurrentThread());
|
||||||
|
|
||||||
// The lock object was freed once useless, so we're recreating a new one
|
// The lock object will be garbage-collected
|
||||||
|
// if you free its reference for a long time,
|
||||||
|
// and you will get a new one at the next time.
|
||||||
|
int lock2Hash = System.identityHashCode(lock2);
|
||||||
|
lock2 = null;
|
||||||
|
lock20 = null;
|
||||||
|
|
||||||
|
System.gc();
|
||||||
|
System.gc();
|
||||||
|
System.gc();
|
||||||
|
|
||||||
ReentrantLock lock200 = locker.acquireLock("l2");
|
ReentrantLock lock200 = locker.acquireLock("l2");
|
||||||
Assert.assertTrue(lock2 != lock200);
|
Assert.assertNotEquals(lock2Hash, System.identityHashCode(lock200));
|
||||||
lock200.unlock();
|
lock200.unlock();
|
||||||
Assert.assertFalse(lock200.isHeldByCurrentThread());
|
Assert.assertFalse(lock200.isHeldByCurrentThread());
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue