From e30176cc6929ea35fb971a088e8514ff7ec47ff7 Mon Sep 17 00:00:00 2001 From: Simon Willnauer Date: Mon, 14 Jul 2014 16:52:20 +0200 Subject: [PATCH] Add read/write lock semantics to KeyedLock --- .../common/util/concurrent/KeyedLock.java | 59 +++++++++++++++-- .../transport/netty/KeyedLockTests.java | 65 +++++++++++++++---- 2 files changed, 107 insertions(+), 17 deletions(-) diff --git a/src/main/java/org/elasticsearch/common/util/concurrent/KeyedLock.java b/src/main/java/org/elasticsearch/common/util/concurrent/KeyedLock.java index 5f23418413f..862bc6d9645 100644 --- a/src/main/java/org/elasticsearch/common/util/concurrent/KeyedLock.java +++ b/src/main/java/org/elasticsearch/common/util/concurrent/KeyedLock.java @@ -23,7 +23,10 @@ import org.elasticsearch.ElasticsearchIllegalStateException; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; /** * This class manages locks. Locks can be accessed with an identifier and are @@ -34,18 +37,17 @@ import java.util.concurrent.locks.ReentrantLock; * A Thread can acquire a lock only once. * * */ - public class KeyedLock { private final ConcurrentMap map = ConcurrentCollections.newConcurrentMap(); - private final ThreadLocal threadLocal = new ThreadLocal<>(); + protected final ThreadLocal threadLocal = new ThreadLocal<>(); public void acquire(T key) { while (true) { if (threadLocal.get() != null) { // if we are here, the thread already has the lock - throw new ElasticsearchIllegalStateException("Lock already accquired in Thread" + Thread.currentThread().getId() + throw new ElasticsearchIllegalStateException("Lock already acquired in Thread" + Thread.currentThread().getId() + " for key " + key); } KeyLock perNodeLock = map.get(key); @@ -71,8 +73,12 @@ public class KeyedLock { public void release(T key) { KeyLock lock = threadLocal.get(); if (lock == null) { - throw new ElasticsearchIllegalStateException("Lock not accquired"); + throw new ElasticsearchIllegalStateException("Lock not acquired"); } + release(key, lock); + } + + void release(T key, KeyLock lock) { assert lock.isHeldByCurrentThread(); assert lock == map.get(key); lock.unlock(); @@ -83,6 +89,7 @@ public class KeyedLock { } } + @SuppressWarnings("serial") private final static class KeyLock extends ReentrantLock { private final AtomicInteger count = new AtomicInteger(1); @@ -92,4 +99,48 @@ public class KeyedLock { return !map.isEmpty(); } + /** + * A {@link KeyedLock} that allows to acquire a global lock that guarantees + * exclusive access to the resource the KeyedLock is guarding. + */ + public final static class GlobalLockable extends KeyedLock { + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + + @Override + public void acquire(T key) { + boolean success = false; + lock.readLock().lock(); + try { + super.acquire(key); + success = true; + } finally { + if (!success) { + lock.readLock().unlock(); + } + } + } + + @Override + public void release(T key) { + KeyLock keyLock = threadLocal.get(); + if (keyLock == null) { + throw new ElasticsearchIllegalStateException("Lock not acquired"); + } + try { + release(key, keyLock); + } finally { + lock.readLock().unlock(); + } + } + + /** + * Returns a global lock guaranteeing exclusive access to the resource + * this KeyedLock is guarding. + */ + public Lock globalLock() { + return lock.writeLock(); + } + } + } diff --git a/src/test/java/org/elasticsearch/transport/netty/KeyedLockTests.java b/src/test/java/org/elasticsearch/transport/netty/KeyedLockTests.java index e6df25433c5..fe76640bbab 100644 --- a/src/test/java/org/elasticsearch/transport/netty/KeyedLockTests.java +++ b/src/test/java/org/elasticsearch/transport/netty/KeyedLockTests.java @@ -40,14 +40,14 @@ public class KeyedLockTests extends ElasticsearchTestCase { public void checkIfMapEmptyAfterLotsOfAcquireAndReleases() throws InterruptedException { ConcurrentHashMap counter = new ConcurrentHashMap<>(); ConcurrentHashMap safeCounter = new ConcurrentHashMap<>(); - KeyedLock connectionLock = new KeyedLock<>(); + KeyedLock connectionLock = randomBoolean() ? new KeyedLock.GlobalLockable() : new KeyedLock(); String[] names = new String[randomIntBetween(1, 40)]; for (int i = 0; i < names.length; i++) { names[i] = randomRealisticUnicodeOfLengthBetween(10, 20); } CountDownLatch startLatch = new CountDownLatch(1); int numThreads = randomIntBetween(3, 10); - Thread[] threads = new Thread[numThreads]; + AcquireAndReleaseThread[] threads = new AcquireAndReleaseThread[numThreads]; for (int i = 0; i < numThreads; i++) { threads[i] = new AcquireAndReleaseThread(startLatch, connectionLock, names, counter, safeCounter); } @@ -55,6 +55,12 @@ public class KeyedLockTests extends ElasticsearchTestCase { threads[i].start(); } startLatch.countDown(); + for (int i = 0; i < numThreads; i++) { + if (randomBoolean()) { + threads[i].incWithGlobal(); + } + } + for (int i = 0; i < numThreads; i++) { threads[i].join(); } @@ -69,13 +75,23 @@ public class KeyedLockTests extends ElasticsearchTestCase { } } + @Test(expected = ElasticsearchIllegalStateException.class) + public void checkCannotAcquireTwoLocksGlobal() throws InterruptedException { + KeyedLock.GlobalLockable connectionLock = new KeyedLock.GlobalLockable<>(); + String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50)); + connectionLock.acquire(name); + try { + connectionLock.acquire(name); + } finally { + connectionLock.release(name); + connectionLock.globalLock().lock(); + connectionLock.globalLock().unlock(); + } + } + @Test(expected = ElasticsearchIllegalStateException.class) public void checkCannotAcquireTwoLocks() throws InterruptedException { - ConcurrentHashMap counters = new ConcurrentHashMap<>(); - ConcurrentHashMap safeCounter = new ConcurrentHashMap<>(); - KeyedLock connectionLock = new KeyedLock<>(); - String[] names = new String[randomIntBetween(1, 40)]; - connectionLock = new KeyedLock<>(); + KeyedLock connectionLock = randomBoolean() ? new KeyedLock.GlobalLockable() : new KeyedLock(); String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50)); connectionLock.acquire(name); connectionLock.acquire(name); @@ -83,11 +99,7 @@ public class KeyedLockTests extends ElasticsearchTestCase { @Test(expected = ElasticsearchIllegalStateException.class) public void checkCannotReleaseUnacquiredLock() throws InterruptedException { - ConcurrentHashMap counters = new ConcurrentHashMap<>(); - ConcurrentHashMap safeCounter = new ConcurrentHashMap<>(); - KeyedLock connectionLock = new KeyedLock<>(); - String[] names = new String[randomIntBetween(1, 40)]; - connectionLock = new KeyedLock<>(); + KeyedLock connectionLock = randomBoolean() ? new KeyedLock.GlobalLockable() : new KeyedLock(); String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50)); connectionLock.release(name); } @@ -114,7 +126,7 @@ public class KeyedLockTests extends ElasticsearchTestCase { } catch (InterruptedException e) { throw new RuntimeException(); } - int numRuns = scaledRandomIntBetween(500, 5000); + int numRuns = scaledRandomIntBetween(5000, 50000); for (int i = 0; i < numRuns; i++) { String curName = names[randomInt(names.length - 1)]; connectionLock.acquire(curName); @@ -137,5 +149,32 @@ public class KeyedLockTests extends ElasticsearchTestCase { } } } + + public void incWithGlobal() { + if (connectionLock instanceof KeyedLock.GlobalLockable) { + final int iters = randomIntBetween(10, 200); + for (int i = 0; i < iters; i++) { + ((KeyedLock.GlobalLockable) connectionLock).globalLock().lock(); + try { + String curName = names[randomInt(names.length - 1)]; + Integer integer = counter.get(curName); + if (integer == null) { + counter.put(curName, 1); + } else { + counter.put(curName, integer.intValue() + 1); + } + AtomicInteger atomicInteger = new AtomicInteger(0); + AtomicInteger value = safeCounter.putIfAbsent(curName, atomicInteger); + if (value == null) { + atomicInteger.incrementAndGet(); + } else { + value.incrementAndGet(); + } + } finally { + ((KeyedLock.GlobalLockable) connectionLock).globalLock().unlock(); + } + } + } + } } }