Make KeyedLock reentrant (#27920)
Today we prevent that the same thread acquires the same lock more than once. This restriction is a relict form the early days of this concurrency construct and can be removed.
This commit is contained in:
parent
4585cc8312
commit
c4fae375b0
|
@ -32,28 +32,34 @@ import java.util.concurrent.locks.ReentrantLock;
|
||||||
* created the first time they are acquired and removed if no thread hold the
|
* created the first time they are acquired and removed if no thread hold the
|
||||||
* lock. The latter is important to assure that the list of locks does not grow
|
* lock. The latter is important to assure that the list of locks does not grow
|
||||||
* infinitely.
|
* infinitely.
|
||||||
*
|
* Note: this lock is reentrant
|
||||||
*
|
*
|
||||||
* */
|
* */
|
||||||
public class KeyedLock<T> {
|
public final class KeyedLock<T> {
|
||||||
|
|
||||||
|
private final ConcurrentMap<T, KeyLock> map = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
|
||||||
private final boolean fair;
|
private final boolean fair;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* Creates a new lock
|
||||||
* @param fair Use fair locking, ie threads get the lock in the order they requested it
|
* @param fair Use fair locking, ie threads get the lock in the order they requested it
|
||||||
*/
|
*/
|
||||||
public KeyedLock(boolean fair) {
|
public KeyedLock(boolean fair) {
|
||||||
this.fair = fair;
|
this.fair = fair;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a non-fair lock
|
||||||
|
*/
|
||||||
public KeyedLock() {
|
public KeyedLock() {
|
||||||
this(false);
|
this(false);
|
||||||
}
|
}
|
||||||
|
|
||||||
private final ConcurrentMap<T, KeyLock> map = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency();
|
/**
|
||||||
|
* Acquires a lock for the given key. The key is compared by it's equals method not by object identity. The lock can be acquired
|
||||||
|
* by the same thread multiple times. The lock is released by closing the returned {@link Releasable}.
|
||||||
|
*/
|
||||||
public Releasable acquire(T key) {
|
public Releasable acquire(T key) {
|
||||||
assert isHeldByCurrentThread(key) == false : "lock for " + key + " is already heald by this thread";
|
|
||||||
while (true) {
|
while (true) {
|
||||||
KeyLock perNodeLock = map.get(key);
|
KeyLock perNodeLock = map.get(key);
|
||||||
if (perNodeLock == null) {
|
if (perNodeLock == null) {
|
||||||
|
@ -73,6 +79,9 @@ public class KeyedLock<T> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns <code>true</code> iff the caller thread holds the lock for the given key
|
||||||
|
*/
|
||||||
public boolean isHeldByCurrentThread(T key) {
|
public boolean isHeldByCurrentThread(T key) {
|
||||||
KeyLock lock = map.get(key);
|
KeyLock lock = map.get(key);
|
||||||
if (lock == null) {
|
if (lock == null) {
|
||||||
|
@ -81,7 +90,7 @@ public class KeyedLock<T> {
|
||||||
return lock.isHeldByCurrentThread();
|
return lock.isHeldByCurrentThread();
|
||||||
}
|
}
|
||||||
|
|
||||||
void release(T key, KeyLock lock) {
|
private void release(T key, KeyLock lock) {
|
||||||
assert lock == map.get(key);
|
assert lock == map.get(key);
|
||||||
lock.unlock();
|
lock.unlock();
|
||||||
int decrementAndGet = lock.count.decrementAndGet();
|
int decrementAndGet = lock.count.decrementAndGet();
|
||||||
|
@ -118,8 +127,11 @@ public class KeyedLock<T> {
|
||||||
private final AtomicInteger count = new AtomicInteger(1);
|
private final AtomicInteger count = new AtomicInteger(1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns <code>true</code> if this lock has at least one locked key.
|
||||||
|
*/
|
||||||
public boolean hasLockedKeys() {
|
public boolean hasLockedKeys() {
|
||||||
return !map.isEmpty();
|
return map.isEmpty() == false;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
@ -24,6 +24,9 @@ import org.elasticsearch.common.util.concurrent.KeyedLock;
|
||||||
import org.elasticsearch.test.ESTestCase;
|
import org.elasticsearch.test.ESTestCase;
|
||||||
import org.hamcrest.Matchers;
|
import org.hamcrest.Matchers;
|
||||||
|
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.Collections;
|
||||||
|
import java.util.List;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
import java.util.concurrent.ConcurrentHashMap;
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
@ -67,6 +70,45 @@ public class KeyedLockTests extends ESTestCase {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testHasLockedKeys() {
|
||||||
|
KeyedLock<String> lock = new KeyedLock<>();
|
||||||
|
assertFalse(lock.hasLockedKeys());
|
||||||
|
Releasable foo = lock.acquire("foo");
|
||||||
|
assertTrue(lock.hasLockedKeys());
|
||||||
|
foo.close();
|
||||||
|
assertFalse(lock.hasLockedKeys());
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testLockIsReentrant() throws InterruptedException {
|
||||||
|
KeyedLock<String> lock = new KeyedLock<>();
|
||||||
|
Releasable foo = lock.acquire("foo");
|
||||||
|
assertTrue(lock.isHeldByCurrentThread("foo"));
|
||||||
|
assertFalse(lock.isHeldByCurrentThread("bar"));
|
||||||
|
Releasable foo2 = lock.acquire("foo");
|
||||||
|
AtomicInteger test = new AtomicInteger(0);
|
||||||
|
CountDownLatch latch = new CountDownLatch(1);
|
||||||
|
Thread t = new Thread(() -> {
|
||||||
|
latch.countDown();
|
||||||
|
try (Releasable r = lock.acquire("foo")) {
|
||||||
|
test.incrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
});
|
||||||
|
t.start();
|
||||||
|
latch.await();
|
||||||
|
Thread.yield();
|
||||||
|
assertEquals(0, test.get());
|
||||||
|
List<Releasable> list = Arrays.asList(foo, foo2);
|
||||||
|
Collections.shuffle(list, random());
|
||||||
|
list.get(0).close();
|
||||||
|
Thread.yield();
|
||||||
|
assertEquals(0, test.get());
|
||||||
|
list.get(1).close();
|
||||||
|
t.join();
|
||||||
|
assertEquals(1, test.get());
|
||||||
|
assertFalse(lock.hasLockedKeys());
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
public static class AcquireAndReleaseThread extends Thread {
|
public static class AcquireAndReleaseThread extends Thread {
|
||||||
private CountDownLatch startLatch;
|
private CountDownLatch startLatch;
|
||||||
|
@ -98,6 +140,12 @@ public class KeyedLockTests extends ESTestCase {
|
||||||
try (Releasable ignored = connectionLock.acquire(curName)) {
|
try (Releasable ignored = connectionLock.acquire(curName)) {
|
||||||
assert connectionLock.isHeldByCurrentThread(curName);
|
assert connectionLock.isHeldByCurrentThread(curName);
|
||||||
assert connectionLock.isHeldByCurrentThread(curName + "bla") == false;
|
assert connectionLock.isHeldByCurrentThread(curName + "bla") == false;
|
||||||
|
if (randomBoolean()) {
|
||||||
|
try (Releasable reentrantIgnored = connectionLock.acquire(curName)) {
|
||||||
|
// just acquire this and make sure we can :)
|
||||||
|
Thread.yield();
|
||||||
|
}
|
||||||
|
}
|
||||||
Integer integer = counter.get(curName);
|
Integer integer = counter.get(curName);
|
||||||
if (integer == null) {
|
if (integer == null) {
|
||||||
counter.put(curName, 1);
|
counter.put(curName, 1);
|
||||||
|
|
Loading…
Reference in New Issue