From c4fae375b0a2d724420deab6db5eba0930f2d4bf Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Wed, 20 Dec 2017 13:53:03 +0100 Subject: [PATCH] Make KeyedLock reentrant (#27920) Today we prevent that the same thread acquires the same lock more than once. This restriction is a relict form the early days of this concurrency construct and can be removed. --- .../common/util/concurrent/KeyedLock.java | 26 +++++++--- .../util/concurrent/KeyedLockTests.java | 48 +++++++++++++++++++ 2 files changed, 67 insertions(+), 7 deletions(-) diff --git a/core/src/main/java/org/elasticsearch/common/util/concurrent/KeyedLock.java b/core/src/main/java/org/elasticsearch/common/util/concurrent/KeyedLock.java index e9b67c65657..8efff4edf5c 100644 --- a/core/src/main/java/org/elasticsearch/common/util/concurrent/KeyedLock.java +++ b/core/src/main/java/org/elasticsearch/common/util/concurrent/KeyedLock.java @@ -32,28 +32,34 @@ import java.util.concurrent.locks.ReentrantLock; * created the first time they are acquired and removed if no thread hold the * lock. The latter is important to assure that the list of locks does not grow * infinitely. - * + * Note: this lock is reentrant * * */ -public class KeyedLock { +public final class KeyedLock { + private final ConcurrentMap map = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); private final boolean fair; /** + * Creates a new lock * @param fair Use fair locking, ie threads get the lock in the order they requested it */ public KeyedLock(boolean fair) { this.fair = fair; } + /** + * Creates a non-fair lock + */ public KeyedLock() { this(false); } - private final ConcurrentMap map = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); - + /** + * Acquires a lock for the given key. The key is compared by it's equals method not by object identity. The lock can be acquired + * by the same thread multiple times. The lock is released by closing the returned {@link Releasable}. + */ public Releasable acquire(T key) { - assert isHeldByCurrentThread(key) == false : "lock for " + key + " is already heald by this thread"; while (true) { KeyLock perNodeLock = map.get(key); if (perNodeLock == null) { @@ -73,6 +79,9 @@ public class KeyedLock { } } + /** + * Returns true iff the caller thread holds the lock for the given key + */ public boolean isHeldByCurrentThread(T key) { KeyLock lock = map.get(key); if (lock == null) { @@ -81,7 +90,7 @@ public class KeyedLock { return lock.isHeldByCurrentThread(); } - void release(T key, KeyLock lock) { + private void release(T key, KeyLock lock) { assert lock == map.get(key); lock.unlock(); int decrementAndGet = lock.count.decrementAndGet(); @@ -118,8 +127,11 @@ public class KeyedLock { private final AtomicInteger count = new AtomicInteger(1); } + /** + * Returns true if this lock has at least one locked key. + */ public boolean hasLockedKeys() { - return !map.isEmpty(); + return map.isEmpty() == false; } } diff --git a/core/src/test/java/org/elasticsearch/common/util/concurrent/KeyedLockTests.java b/core/src/test/java/org/elasticsearch/common/util/concurrent/KeyedLockTests.java index 97a07ddc3fb..4d8fc8d3ecc 100644 --- a/core/src/test/java/org/elasticsearch/common/util/concurrent/KeyedLockTests.java +++ b/core/src/test/java/org/elasticsearch/common/util/concurrent/KeyedLockTests.java @@ -24,6 +24,9 @@ import org.elasticsearch.common.util.concurrent.KeyedLock; import org.elasticsearch.test.ESTestCase; import org.hamcrest.Matchers; +import java.util.Arrays; +import java.util.Collections; +import java.util.List; import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; @@ -67,6 +70,45 @@ public class KeyedLockTests extends ESTestCase { } } + public void testHasLockedKeys() { + KeyedLock lock = new KeyedLock<>(); + assertFalse(lock.hasLockedKeys()); + Releasable foo = lock.acquire("foo"); + assertTrue(lock.hasLockedKeys()); + foo.close(); + assertFalse(lock.hasLockedKeys()); + } + + public void testLockIsReentrant() throws InterruptedException { + KeyedLock lock = new KeyedLock<>(); + Releasable foo = lock.acquire("foo"); + assertTrue(lock.isHeldByCurrentThread("foo")); + assertFalse(lock.isHeldByCurrentThread("bar")); + Releasable foo2 = lock.acquire("foo"); + AtomicInteger test = new AtomicInteger(0); + CountDownLatch latch = new CountDownLatch(1); + Thread t = new Thread(() -> { + latch.countDown(); + try (Releasable r = lock.acquire("foo")) { + test.incrementAndGet(); + } + + }); + t.start(); + latch.await(); + Thread.yield(); + assertEquals(0, test.get()); + List list = Arrays.asList(foo, foo2); + Collections.shuffle(list, random()); + list.get(0).close(); + Thread.yield(); + assertEquals(0, test.get()); + list.get(1).close(); + t.join(); + assertEquals(1, test.get()); + assertFalse(lock.hasLockedKeys()); + } + public static class AcquireAndReleaseThread extends Thread { private CountDownLatch startLatch; @@ -98,6 +140,12 @@ public class KeyedLockTests extends ESTestCase { try (Releasable ignored = connectionLock.acquire(curName)) { assert connectionLock.isHeldByCurrentThread(curName); assert connectionLock.isHeldByCurrentThread(curName + "bla") == false; + if (randomBoolean()) { + try (Releasable reentrantIgnored = connectionLock.acquire(curName)) { + // just acquire this and make sure we can :) + Thread.yield(); + } + } Integer integer = counter.get(curName); if (integer == null) { counter.put(curName, 1);