From 98f1387f867835118cadf8b2bbc1e937a16de250 Mon Sep 17 00:00:00 2001
From: stack
Date: Mon, 12 Oct 2015 23:13:56 -0700
Subject: [PATCH] HBASE-14268 Improve KeyLocker (Hiroshi Ikeda)
---
.../apache/hadoop/hbase/util/KeyLocker.java | 129 +++++++-----------
.../hadoop/hbase/util/TestKeyLocker.java | 16 ++-
2 files changed, 60 insertions(+), 85 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java
index 5398582c393..dec91aaf5fe 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/util/KeyLocker.java
@@ -18,49 +18,45 @@
package org.apache.hadoop.hbase.util;
-
-import java.util.HashMap;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
import java.util.Map;
import java.util.Set;
-import java.util.SortedSet;
-import java.util.TreeSet;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.hbase.classification.InterfaceAudience;
/**
* A utility class to manage a set of locks. Each lock is identified by a String which serves
- * as a key. Typical usage is:
- * class Example{
- * private final static KeyLocker<String> locker = new Locker<String>();
- *
- *
- * public void foo(String s){
- * Lock lock = locker.acquireLock(s);
- * try {
- * // whatever
- * }finally{
- * lock.unlock();
+ * as a key. Typical usage is:
+ * class Example {
+ * private final static KeyLocker<String> locker = new Locker<String>();
+ * public void foo(String s){
+ * Lock lock = locker.acquireLock(s);
+ * try {
+ * // whatever
+ * }finally{
+ * lock.unlock();
+ * }
+ * }
* }
- * }
- * }
- *
+ *
*/
@InterfaceAudience.Private
-public class KeyLocker> {
- private static final Log LOG = LogFactory.getLog(KeyLocker.class);
-
+public class KeyLocker {
// The number of lock we want to easily support. It's not a maximum.
private static final int NB_CONCURRENT_LOCKS = 1000;
- // We need an atomic counter to manage the number of users using the lock and free it when
- // it's equal to zero.
- private final Map, AtomicInteger>> locks =
- new HashMap, AtomicInteger>>(NB_CONCURRENT_LOCKS);
+ private final WeakObjectPool lockPool =
+ new WeakObjectPool(
+ new WeakObjectPool.ObjectFactory() {
+ @Override
+ public ReentrantLock createObject(K key) {
+ return new ReentrantLock();
+ }
+ },
+ NB_CONCURRENT_LOCKS);
/**
* Return a lock for the given key. The lock is already locked.
@@ -70,67 +66,36 @@ public class KeyLocker> {
public ReentrantLock acquireLock(K key) {
if (key == null) throw new IllegalArgumentException("key must not be null");
- Pair, AtomicInteger> lock;
- synchronized (this) {
- lock = locks.get(key);
- if (lock == null) {
- lock = new Pair, AtomicInteger>(
- new KeyLock(this, key), new AtomicInteger(1));
- locks.put(key, lock);
- } else {
- lock.getSecond().incrementAndGet();
- }
- }
- lock.getFirst().lock();
- return lock.getFirst();
+ lockPool.purge();
+ ReentrantLock lock = lockPool.get(key);
+
+ lock.lock();
+ return lock;
}
/**
* Acquire locks for a set of keys. The keys will be
* sorted internally to avoid possible deadlock.
+ *
+ * @throw ClassCastException if the given {@code keys}
+ * contains elements that are not mutually comparable
*/
- public Map acquireLocks(final Set keys) {
- Map locks = new HashMap(keys.size());
- SortedSet sortedKeys = new TreeSet(keys);
- for (K key : sortedKeys) {
- locks.put(key, acquireLock(key));
+ public Map acquireLocks(Set extends K> keys) {
+ Object[] keyArray = keys.toArray();
+ Arrays.sort(keyArray);
+
+ lockPool.purge();
+ Map locks = new LinkedHashMap(keyArray.length);
+ for (Object o : keyArray) {
+ @SuppressWarnings("unchecked")
+ K key = (K)o;
+ ReentrantLock lock = lockPool.get(key);
+ locks.put(key, lock);
+ }
+
+ for (Lock lock : locks.values()) {
+ lock.lock();
}
return locks;
}
-
- /**
- * Free the lock for the given key.
- */
- private synchronized void releaseLock(K key) {
- Pair, AtomicInteger> lock = locks.get(key);
- if (lock != null) {
- if (lock.getSecond().decrementAndGet() == 0) {
- locks.remove(key);
- }
- } else {
- String message = "Can't release the lock for " + key+", this key is not in the key list." +
- " known keys are: "+ locks.keySet();
- LOG.error(message);
- throw new RuntimeException(message);
- }
- }
-
- static class KeyLock> extends ReentrantLock {
- private static final long serialVersionUID = -12432857283423584L;
-
- private final transient KeyLocker locker;
- private final K lockId;
-
- private KeyLock(KeyLocker locker, K lockId) {
- super();
- this.locker = locker;
- this.lockId = lockId;
- }
-
- @Override
- public void unlock() {
- super.unlock();
- locker.releaseLock(lockId);
- }
- }
}
diff --git a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestKeyLocker.java b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestKeyLocker.java
index c7eb7557cda..8fdd9eaae62 100644
--- a/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestKeyLocker.java
+++ b/hbase-common/src/test/java/org/apache/hadoop/hbase/util/TestKeyLocker.java
@@ -29,7 +29,7 @@ import java.util.concurrent.locks.ReentrantLock;
public class TestKeyLocker {
@Test
public void testLocker(){
- KeyLocker locker = new KeyLocker();
+ KeyLocker locker = new KeyLocker();
ReentrantLock lock1 = locker.acquireLock("l1");
Assert.assertTrue(lock1.isHeldByCurrentThread());
@@ -50,9 +50,19 @@ public class TestKeyLocker {
lock2.unlock();
Assert.assertFalse(lock20.isHeldByCurrentThread());
- // The lock object was freed once useless, so we're recreating a new one
+ // The lock object will be garbage-collected
+ // if you free its reference for a long time,
+ // and you will get a new one at the next time.
+ int lock2Hash = System.identityHashCode(lock2);
+ lock2 = null;
+ lock20 = null;
+
+ System.gc();
+ System.gc();
+ System.gc();
+
ReentrantLock lock200 = locker.acquireLock("l2");
- Assert.assertTrue(lock2 != lock200);
+ Assert.assertNotEquals(lock2Hash, System.identityHashCode(lock200));
lock200.unlock();
Assert.assertFalse(lock200.isHeldByCurrentThread());