Add read/write lock semantics to KeyedLock

This commit is contained in:
Simon Willnauer 2014-07-14 16:52:20 +02:00
parent b0c0ff8ac0
commit e30176cc69
2 changed files with 107 additions and 17 deletions

View File

@ -23,7 +23,10 @@ import org.elasticsearch.ElasticsearchIllegalStateException;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/** /**
* This class manages locks. Locks can be accessed with an identifier and are * This class manages locks. Locks can be accessed with an identifier and are
@ -34,18 +37,17 @@ import java.util.concurrent.locks.ReentrantLock;
* A Thread can acquire a lock only once. * A Thread can acquire a lock only once.
* *
* */ * */
public class KeyedLock<T> { public class KeyedLock<T> {
private final ConcurrentMap<T, KeyLock> map = ConcurrentCollections.newConcurrentMap(); private final ConcurrentMap<T, KeyLock> map = ConcurrentCollections.newConcurrentMap();
private final ThreadLocal<KeyLock> threadLocal = new ThreadLocal<>(); protected final ThreadLocal<KeyLock> threadLocal = new ThreadLocal<>();
public void acquire(T key) { public void acquire(T key) {
while (true) { while (true) {
if (threadLocal.get() != null) { if (threadLocal.get() != null) {
// if we are here, the thread already has the lock // if we are here, the thread already has the lock
throw new ElasticsearchIllegalStateException("Lock already accquired in Thread" + Thread.currentThread().getId() throw new ElasticsearchIllegalStateException("Lock already acquired in Thread" + Thread.currentThread().getId()
+ " for key " + key); + " for key " + key);
} }
KeyLock perNodeLock = map.get(key); KeyLock perNodeLock = map.get(key);
@ -71,8 +73,12 @@ public class KeyedLock<T> {
public void release(T key) { public void release(T key) {
KeyLock lock = threadLocal.get(); KeyLock lock = threadLocal.get();
if (lock == null) { if (lock == null) {
throw new ElasticsearchIllegalStateException("Lock not accquired"); throw new ElasticsearchIllegalStateException("Lock not acquired");
} }
release(key, lock);
}
void release(T key, KeyLock lock) {
assert lock.isHeldByCurrentThread(); assert lock.isHeldByCurrentThread();
assert lock == map.get(key); assert lock == map.get(key);
lock.unlock(); lock.unlock();
@ -83,6 +89,7 @@ public class KeyedLock<T> {
} }
} }
@SuppressWarnings("serial") @SuppressWarnings("serial")
private final static class KeyLock extends ReentrantLock { private final static class KeyLock extends ReentrantLock {
private final AtomicInteger count = new AtomicInteger(1); private final AtomicInteger count = new AtomicInteger(1);
@ -92,4 +99,48 @@ public class KeyedLock<T> {
return !map.isEmpty(); return !map.isEmpty();
} }
/**
* A {@link KeyedLock} that allows to acquire a global lock that guarantees
* exclusive access to the resource the KeyedLock is guarding.
*/
public final static class GlobalLockable<T> extends KeyedLock<T> {
private final ReadWriteLock lock = new ReentrantReadWriteLock();
@Override
public void acquire(T key) {
boolean success = false;
lock.readLock().lock();
try {
super.acquire(key);
success = true;
} finally {
if (!success) {
lock.readLock().unlock();
}
}
}
@Override
public void release(T key) {
KeyLock keyLock = threadLocal.get();
if (keyLock == null) {
throw new ElasticsearchIllegalStateException("Lock not acquired");
}
try {
release(key, keyLock);
} finally {
lock.readLock().unlock();
}
}
/**
* Returns a global lock guaranteeing exclusive access to the resource
* this KeyedLock is guarding.
*/
public Lock globalLock() {
return lock.writeLock();
}
}
} }

View File

@ -40,14 +40,14 @@ public class KeyedLockTests extends ElasticsearchTestCase {
public void checkIfMapEmptyAfterLotsOfAcquireAndReleases() throws InterruptedException { public void checkIfMapEmptyAfterLotsOfAcquireAndReleases() throws InterruptedException {
ConcurrentHashMap<String, Integer> counter = new ConcurrentHashMap<>(); ConcurrentHashMap<String, Integer> counter = new ConcurrentHashMap<>();
ConcurrentHashMap<String, AtomicInteger> safeCounter = new ConcurrentHashMap<>(); ConcurrentHashMap<String, AtomicInteger> safeCounter = new ConcurrentHashMap<>();
KeyedLock<String> connectionLock = new KeyedLock<>(); KeyedLock<String> connectionLock = randomBoolean() ? new KeyedLock.GlobalLockable<String>() : new KeyedLock<String>();
String[] names = new String[randomIntBetween(1, 40)]; String[] names = new String[randomIntBetween(1, 40)];
for (int i = 0; i < names.length; i++) { for (int i = 0; i < names.length; i++) {
names[i] = randomRealisticUnicodeOfLengthBetween(10, 20); names[i] = randomRealisticUnicodeOfLengthBetween(10, 20);
} }
CountDownLatch startLatch = new CountDownLatch(1); CountDownLatch startLatch = new CountDownLatch(1);
int numThreads = randomIntBetween(3, 10); int numThreads = randomIntBetween(3, 10);
Thread[] threads = new Thread[numThreads]; AcquireAndReleaseThread[] threads = new AcquireAndReleaseThread[numThreads];
for (int i = 0; i < numThreads; i++) { for (int i = 0; i < numThreads; i++) {
threads[i] = new AcquireAndReleaseThread(startLatch, connectionLock, names, counter, safeCounter); threads[i] = new AcquireAndReleaseThread(startLatch, connectionLock, names, counter, safeCounter);
} }
@ -55,6 +55,12 @@ public class KeyedLockTests extends ElasticsearchTestCase {
threads[i].start(); threads[i].start();
} }
startLatch.countDown(); startLatch.countDown();
for (int i = 0; i < numThreads; i++) {
if (randomBoolean()) {
threads[i].incWithGlobal();
}
}
for (int i = 0; i < numThreads; i++) { for (int i = 0; i < numThreads; i++) {
threads[i].join(); threads[i].join();
} }
@ -69,13 +75,23 @@ public class KeyedLockTests extends ElasticsearchTestCase {
} }
} }
@Test(expected = ElasticsearchIllegalStateException.class)
public void checkCannotAcquireTwoLocksGlobal() throws InterruptedException {
KeyedLock.GlobalLockable<String> connectionLock = new KeyedLock.GlobalLockable<>();
String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50));
connectionLock.acquire(name);
try {
connectionLock.acquire(name);
} finally {
connectionLock.release(name);
connectionLock.globalLock().lock();
connectionLock.globalLock().unlock();
}
}
@Test(expected = ElasticsearchIllegalStateException.class) @Test(expected = ElasticsearchIllegalStateException.class)
public void checkCannotAcquireTwoLocks() throws InterruptedException { public void checkCannotAcquireTwoLocks() throws InterruptedException {
ConcurrentHashMap<String, Integer> counters = new ConcurrentHashMap<>(); KeyedLock<String> connectionLock = randomBoolean() ? new KeyedLock.GlobalLockable<String>() : new KeyedLock<String>();
ConcurrentHashMap<String, AtomicInteger> safeCounter = new ConcurrentHashMap<>();
KeyedLock<String> connectionLock = new KeyedLock<>();
String[] names = new String[randomIntBetween(1, 40)];
connectionLock = new KeyedLock<>();
String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50)); String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50));
connectionLock.acquire(name); connectionLock.acquire(name);
connectionLock.acquire(name); connectionLock.acquire(name);
@ -83,11 +99,7 @@ public class KeyedLockTests extends ElasticsearchTestCase {
@Test(expected = ElasticsearchIllegalStateException.class) @Test(expected = ElasticsearchIllegalStateException.class)
public void checkCannotReleaseUnacquiredLock() throws InterruptedException { public void checkCannotReleaseUnacquiredLock() throws InterruptedException {
ConcurrentHashMap<String, Integer> counters = new ConcurrentHashMap<>(); KeyedLock<String> connectionLock = randomBoolean() ? new KeyedLock.GlobalLockable<String>() : new KeyedLock<String>();
ConcurrentHashMap<String, AtomicInteger> safeCounter = new ConcurrentHashMap<>();
KeyedLock<String> connectionLock = new KeyedLock<>();
String[] names = new String[randomIntBetween(1, 40)];
connectionLock = new KeyedLock<>();
String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50)); String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50));
connectionLock.release(name); connectionLock.release(name);
} }
@ -114,7 +126,7 @@ public class KeyedLockTests extends ElasticsearchTestCase {
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new RuntimeException(); throw new RuntimeException();
} }
int numRuns = scaledRandomIntBetween(500, 5000); int numRuns = scaledRandomIntBetween(5000, 50000);
for (int i = 0; i < numRuns; i++) { for (int i = 0; i < numRuns; i++) {
String curName = names[randomInt(names.length - 1)]; String curName = names[randomInt(names.length - 1)];
connectionLock.acquire(curName); connectionLock.acquire(curName);
@ -137,5 +149,32 @@ public class KeyedLockTests extends ElasticsearchTestCase {
} }
} }
} }
public void incWithGlobal() {
if (connectionLock instanceof KeyedLock.GlobalLockable) {
final int iters = randomIntBetween(10, 200);
for (int i = 0; i < iters; i++) {
((KeyedLock.GlobalLockable) connectionLock).globalLock().lock();
try {
String curName = names[randomInt(names.length - 1)];
Integer integer = counter.get(curName);
if (integer == null) {
counter.put(curName, 1);
} else {
counter.put(curName, integer.intValue() + 1);
}
AtomicInteger atomicInteger = new AtomicInteger(0);
AtomicInteger value = safeCounter.putIfAbsent(curName, atomicInteger);
if (value == null) {
atomicInteger.incrementAndGet();
} else {
value.incrementAndGet();
}
} finally {
((KeyedLock.GlobalLockable) connectionLock).globalLock().unlock();
}
}
}
}
} }
} }