Make lock service use fair locks.

The `WatchLockService` was not using fair locks. This could result in out of order execution of queue executions of the same watch.
Fork KeydLock tests from core and make global lock fair.

Fixes elastic/elasticsearch#225

Original commit: elastic/x-pack-elasticsearch@77382e09ca
This commit is contained in:
Brian Murphy 2015-04-16 17:46:25 -04:00
parent 49bbb0c801
commit 6c54251e61
4 changed files with 353 additions and 8 deletions

View File

@ -0,0 +1,131 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support.concurrent;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.util.concurrent.ConcurrentCollections;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
*/
public class FairKeyedLock<T> {
private final ConcurrentMap<T, KeyLock> map = ConcurrentCollections.newConcurrentMap();
protected final ThreadLocal<KeyLock> threadLocal = new ThreadLocal<>();
public void acquire(T key) {
while (true) {
if (threadLocal.get() != null) {
// if we are here, the thread already has the lock
throw new ElasticsearchIllegalStateException("Lock already acquired in Thread" + Thread.currentThread().getId()
+ " for key " + key);
}
KeyLock perNodeLock = map.get(key);
if (perNodeLock == null) {
KeyLock newLock = new KeyLock(true);
perNodeLock = map.putIfAbsent(key, newLock);
if (perNodeLock == null) {
newLock.lock();
threadLocal.set(newLock);
return;
}
}
assert perNodeLock != null;
int i = perNodeLock.count.get();
if (i > 0 && perNodeLock.count.compareAndSet(i, i + 1)) {
perNodeLock.lock();
threadLocal.set(perNodeLock);
return;
}
}
}
public void release(T key) {
KeyLock lock = threadLocal.get();
if (lock == null) {
throw new ElasticsearchIllegalStateException("Lock not acquired");
}
release(key, lock);
}
void release(T key, KeyLock lock) {
assert lock.isHeldByCurrentThread();
assert lock == map.get(key);
lock.unlock();
threadLocal.set(null);
int decrementAndGet = lock.count.decrementAndGet();
if (decrementAndGet == 0) {
map.remove(key, lock);
}
}
@SuppressWarnings("serial")
private final static class KeyLock extends ReentrantLock {
private final AtomicInteger count = new AtomicInteger(1);
public KeyLock(boolean fair) {
super(fair);
}
}
public boolean hasLockedKeys() {
return !map.isEmpty();
}
/**
* A {@link FairKeyedLock} that allows to acquire a global lock that guarantees
* exclusive access to the resource the KeyedLock is guarding.
*/
public final static class GlobalLockable<T> extends FairKeyedLock<T> {
private final ReadWriteLock lock = new ReentrantReadWriteLock(true);
@Override
public void acquire(T key) {
boolean success = false;
lock.readLock().lock();
try {
super.acquire(key);
success = true;
} finally {
if (!success) {
lock.readLock().unlock();
}
}
}
@Override
public void release(T key) {
KeyLock keyLock = threadLocal.get();
if (keyLock == null) {
throw new ElasticsearchIllegalStateException("Lock not acquired");
}
try {
release(key, keyLock);
} finally {
lock.readLock().unlock();
}
}
/**
* Returns a global lock guaranteeing exclusive access to the resource
* this KeyedLock is guarding.
*/
public Lock globalLock() {
return lock.writeLock();
}
}
}

View File

@ -6,7 +6,7 @@
package org.elasticsearch.watcher.watch;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.util.concurrent.KeyedLock;
import org.elasticsearch.watcher.support.concurrent.FairKeyedLock;
import java.util.concurrent.atomic.AtomicBoolean;
@ -15,7 +15,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public class WatchLockService {
private final KeyedLock<String> watchLocks = new KeyedLock<>();
private final FairKeyedLock<String> watchLocks = new FairKeyedLock<>();
private final AtomicBoolean running = new AtomicBoolean(false);
public Lock acquire(String name) {
@ -52,16 +52,16 @@ public class WatchLockService {
}
}
KeyedLock<String> getWatchLocks() {
FairKeyedLock<String> getWatchLocks() {
return watchLocks;
}
public static class Lock {
private final String name;
private final KeyedLock<String> watchLocks;
private final FairKeyedLock<String> watchLocks;
private Lock(String name, KeyedLock<String> watchLocks) {
private Lock(String name, FairKeyedLock<String> watchLocks) {
this.name = name;
this.watchLocks = watchLocks;

View File

@ -0,0 +1,168 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License;
* you may not use this file except in compliance with the Elastic License.
*/
package org.elasticsearch.watcher.support.concurrent;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.common.netty.util.internal.ConcurrentHashMap;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.hamcrest.Matchers;
import org.junit.Test;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.not;
/**
*/
public class FairKeyedLockTests extends ElasticsearchTestCase{
@Test
public void checkIfMapEmptyAfterLotsOfAcquireAndReleases() throws InterruptedException {
ConcurrentHashMap<String, Integer> counter = new ConcurrentHashMap<>();
ConcurrentHashMap<String, AtomicInteger> safeCounter = new ConcurrentHashMap<>();
FairKeyedLock<String> connectionLock = randomBoolean() ? new FairKeyedLock.GlobalLockable<String>() : new FairKeyedLock<String>();
String[] names = new String[randomIntBetween(1, 40)];
for (int i = 0; i < names.length; i++) {
names[i] = randomRealisticUnicodeOfLengthBetween(10, 20);
}
CountDownLatch startLatch = new CountDownLatch(1);
int numThreads = randomIntBetween(3, 10);
AcquireAndReleaseThread[] threads = new AcquireAndReleaseThread[numThreads];
for (int i = 0; i < numThreads; i++) {
threads[i] = new AcquireAndReleaseThread(startLatch, connectionLock, names, counter, safeCounter);
}
for (int i = 0; i < numThreads; i++) {
threads[i].start();
}
startLatch.countDown();
for (int i = 0; i < numThreads; i++) {
if (randomBoolean()) {
threads[i].incWithGlobal();
}
}
for (int i = 0; i < numThreads; i++) {
threads[i].join();
}
assertThat(connectionLock.hasLockedKeys(), equalTo(false));
Set<Map.Entry<String, Integer>> entrySet = counter.entrySet();
assertThat(counter.size(), equalTo(safeCounter.size()));
for (Map.Entry<String, Integer> entry : entrySet) {
AtomicInteger atomicInteger = safeCounter.get(entry.getKey());
assertThat(atomicInteger, not(Matchers.nullValue()));
assertThat(atomicInteger.get(), equalTo(entry.getValue()));
}
}
@Test(expected = ElasticsearchIllegalStateException.class)
public void checkCannotAcquireTwoLocksGlobal() throws InterruptedException {
FairKeyedLock.GlobalLockable<String> connectionLock = new FairKeyedLock.GlobalLockable<>();
String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50));
connectionLock.acquire(name);
try {
connectionLock.acquire(name);
} finally {
connectionLock.release(name);
connectionLock.globalLock().lock();
connectionLock.globalLock().unlock();
}
}
@Test(expected = ElasticsearchIllegalStateException.class)
public void checkCannotAcquireTwoLocks() throws InterruptedException {
FairKeyedLock<String> connectionLock = randomBoolean() ? new FairKeyedLock.GlobalLockable<String>() : new FairKeyedLock<String>();
String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50));
connectionLock.acquire(name);
connectionLock.acquire(name);
}
@Test(expected = ElasticsearchIllegalStateException.class)
public void checkCannotReleaseUnacquiredLock() throws InterruptedException {
FairKeyedLock<String> connectionLock = randomBoolean() ? new FairKeyedLock.GlobalLockable<String>() : new FairKeyedLock<String>();
String name = randomRealisticUnicodeOfLength(scaledRandomIntBetween(10, 50));
connectionLock.release(name);
}
public static class AcquireAndReleaseThread extends Thread {
private CountDownLatch startLatch;
FairKeyedLock<String> connectionLock;
String[] names;
ConcurrentHashMap<String, Integer> counter;
ConcurrentHashMap<String, AtomicInteger> safeCounter;
public AcquireAndReleaseThread(CountDownLatch startLatch, FairKeyedLock<String> connectionLock, String[] names,
ConcurrentHashMap<String, Integer> counter, ConcurrentHashMap<String, AtomicInteger> safeCounter) {
this.startLatch = startLatch;
this.connectionLock = connectionLock;
this.names = names;
this.counter = counter;
this.safeCounter = safeCounter;
}
public void run() {
try {
startLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException();
}
int numRuns = scaledRandomIntBetween(5000, 50000);
for (int i = 0; i < numRuns; i++) {
String curName = names[randomInt(names.length - 1)];
connectionLock.acquire(curName);
try {
Integer integer = counter.get(curName);
if (integer == null) {
counter.put(curName, 1);
} else {
counter.put(curName, integer.intValue() + 1);
}
} finally {
connectionLock.release(curName);
}
AtomicInteger atomicInteger = new AtomicInteger(0);
AtomicInteger value = safeCounter.putIfAbsent(curName, atomicInteger);
if (value == null) {
atomicInteger.incrementAndGet();
} else {
value.incrementAndGet();
}
}
}
public void incWithGlobal() {
if (connectionLock instanceof FairKeyedLock.GlobalLockable) {
final int iters = randomIntBetween(10, 200);
for (int i = 0; i < iters; i++) {
((FairKeyedLock.GlobalLockable) connectionLock).globalLock().lock();
try {
String curName = names[randomInt(names.length - 1)];
Integer integer = counter.get(curName);
if (integer == null) {
counter.put(curName, 1);
} else {
counter.put(curName, integer.intValue() + 1);
}
AtomicInteger atomicInteger = new AtomicInteger(0);
AtomicInteger value = safeCounter.putIfAbsent(curName, atomicInteger);
if (value == null) {
atomicInteger.incrementAndGet();
} else {
value.incrementAndGet();
}
} finally {
((FairKeyedLock.GlobalLockable) connectionLock).globalLock().unlock();
}
}
}
}
}
}

View File

@ -9,9 +9,11 @@ import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.test.ElasticsearchTestCase;
import org.junit.Test;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.hamcrest.Matchers.*;
/**
*/
@ -54,4 +56,48 @@ public class WatchLockServiceTests extends ElasticsearchTestCase {
lockService.stop();
}
@Test
public void testLocking_fair() throws Exception {
final WatchLockService lockService = new WatchLockService();
lockService.start();
final AtomicInteger value = new AtomicInteger(0);
List<Thread> threads = new ArrayList<>();
class FairRunner implements Runnable {
final int expectedValue;
FairRunner(int expectedValue) {
this.expectedValue = expectedValue;
}
@Override
public void run() {
WatchLockService.Lock lock = lockService.acquire("_name");
try {
int actualValue = value.getAndIncrement();
assertThat(actualValue, equalTo(expectedValue));
Thread.sleep(50);
} catch(InterruptedException ie) {
} finally {
lock.release();
}
}
}
for(int i = 0; i < 100; ++i) {
FairRunner f = new FairRunner(i);
threads.add(new Thread(f));
}
for(Thread t : threads) {
t.start();
Thread.sleep(10);
}
for(Thread t : threads) {
t.join();
}
}
}