HDDS-2223. Support ReadWrite lock in LockManager. (#1564)
This commit is contained in:
parent
cdaa480dbf
commit
9700e2003a
|
@ -18,22 +18,22 @@
|
|||
package org.apache.hadoop.ozone.lock;
|
||||
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.concurrent.locks.ReentrantLock;
|
||||
import java.util.concurrent.locks.ReadWriteLock;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
|
||||
/**
|
||||
* Lock implementation which also maintains counter.
|
||||
*/
|
||||
public final class ActiveLock {
|
||||
|
||||
private Lock lock;
|
||||
private ReadWriteLock lock;
|
||||
private AtomicInteger count;
|
||||
|
||||
/**
|
||||
* Use ActiveLock#newInstance to create instance.
|
||||
*/
|
||||
private ActiveLock() {
|
||||
this.lock = new ReentrantLock();
|
||||
this.lock = new ReentrantReadWriteLock();
|
||||
this.count = new AtomicInteger(0);
|
||||
}
|
||||
|
||||
|
@ -47,21 +47,58 @@ public final class ActiveLock {
|
|||
}
|
||||
|
||||
/**
|
||||
* Acquires the lock.
|
||||
* Acquires read lock.
|
||||
*
|
||||
* <p>If the lock is not available then the current thread becomes
|
||||
* disabled for thread scheduling purposes and lies dormant until the
|
||||
* lock has been acquired.
|
||||
* <p>Acquires the read lock if the write lock is not held by
|
||||
* another thread and returns immediately.
|
||||
*
|
||||
* <p>If the write lock is held by another thread then
|
||||
* the current thread becomes disabled for thread scheduling
|
||||
* purposes and lies dormant until the read lock has been acquired.
|
||||
*/
|
||||
public void lock() {
|
||||
lock.lock();
|
||||
void readLock() {
|
||||
lock.readLock().lock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases the lock.
|
||||
* Attempts to release the read lock.
|
||||
*
|
||||
* <p>If the number of readers is now zero then the lock
|
||||
* is made available for write lock attempts.
|
||||
*/
|
||||
public void unlock() {
|
||||
lock.unlock();
|
||||
void readUnlock() {
|
||||
lock.readLock().unlock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquires write lock.
|
||||
*
|
||||
* <p>Acquires the write lock if neither the read nor write lock
|
||||
* are held by another thread
|
||||
* and returns immediately, setting the write lock hold count to
|
||||
* one.
|
||||
*
|
||||
* <p>If the current thread already holds the write lock then the
|
||||
* hold count is incremented by one and the method returns
|
||||
* immediately.
|
||||
*
|
||||
* <p>If the lock is held by another thread then the current
|
||||
* thread becomes disabled for thread scheduling purposes and
|
||||
* lies dormant until the write lock has been acquired.
|
||||
*/
|
||||
void writeLock() {
|
||||
lock.writeLock().lock();
|
||||
}
|
||||
|
||||
/**
|
||||
* Attempts to release the write lock.
|
||||
*
|
||||
* <p>If the current thread is the holder of this lock then
|
||||
* the hold count is decremented. If the hold count is now
|
||||
* zero then the lock is released.
|
||||
*/
|
||||
void writeUnlock() {
|
||||
lock.writeLock().unlock();
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
@ -25,42 +25,156 @@ import org.slf4j.LoggerFactory;
|
|||
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.function.Consumer;
|
||||
|
||||
/**
|
||||
* Manages the locks on a given resource. A new lock is created for each
|
||||
* and every unique resource. Uniqueness of resource depends on the
|
||||
* {@code equals} implementation of it.
|
||||
*/
|
||||
public class LockManager<T> {
|
||||
public class LockManager<R> {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(LockManager.class);
|
||||
|
||||
private final Map<T, ActiveLock> activeLocks = new ConcurrentHashMap<>();
|
||||
private final Map<R, ActiveLock> activeLocks = new ConcurrentHashMap<>();
|
||||
private final GenericObjectPool<ActiveLock> lockPool =
|
||||
new GenericObjectPool<>(new PooledLockFactory());
|
||||
|
||||
/**
|
||||
* Creates new LockManager instance.
|
||||
* Creates new LockManager instance with the given Configuration.
|
||||
*
|
||||
* @param conf Configuration object
|
||||
*/
|
||||
public LockManager(Configuration conf) {
|
||||
int maxPoolSize = conf.getInt(HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY,
|
||||
public LockManager(final Configuration conf) {
|
||||
final int maxPoolSize = conf.getInt(
|
||||
HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY,
|
||||
HddsConfigKeys.HDDS_LOCK_MAX_CONCURRENCY_DEFAULT);
|
||||
lockPool.setMaxTotal(maxPoolSize);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Acquires the lock on given resource.
|
||||
*
|
||||
* <p>If the lock is not available then the current thread becomes
|
||||
* disabled for thread scheduling purposes and lies dormant until the
|
||||
* lock has been acquired.
|
||||
*
|
||||
* @param resource on which the lock has to be acquired
|
||||
* @deprecated Use {@link LockManager#writeLock} instead
|
||||
*/
|
||||
public void lock(T resource) {
|
||||
activeLocks.compute(resource, (k, v) -> {
|
||||
ActiveLock lock;
|
||||
public void lock(final R resource) {
|
||||
writeLock(resource);
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases the lock on given resource.
|
||||
*
|
||||
* @param resource for which the lock has to be released
|
||||
* @deprecated Use {@link LockManager#writeUnlock} instead
|
||||
*/
|
||||
public void unlock(final R resource) {
|
||||
writeUnlock(resource);
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquires the read lock on given resource.
|
||||
*
|
||||
* <p>Acquires the read lock on resource if the write lock is not held by
|
||||
* another thread and returns immediately.
|
||||
*
|
||||
* <p>If the write lock on resource is held by another thread then
|
||||
* the current thread becomes disabled for thread scheduling
|
||||
* purposes and lies dormant until the read lock has been acquired.
|
||||
*
|
||||
* @param resource on which the read lock has to be acquired
|
||||
*/
|
||||
public void readLock(final R resource) {
|
||||
acquire(resource, ActiveLock::readLock);
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases the read lock on given resource.
|
||||
*
|
||||
* @param resource for which the read lock has to be released
|
||||
* @throws IllegalMonitorStateException if the current thread does not
|
||||
* hold this lock
|
||||
*/
|
||||
public void readUnlock(final R resource) throws IllegalMonitorStateException {
|
||||
release(resource, ActiveLock::readUnlock);
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquires the write lock on given resource.
|
||||
*
|
||||
* <p>Acquires the write lock on resource if neither the read nor write lock
|
||||
* are held by another thread and returns immediately.
|
||||
*
|
||||
* <p>If the current thread already holds the write lock then the
|
||||
* hold count is incremented by one and the method returns
|
||||
* immediately.
|
||||
*
|
||||
* <p>If the lock is held by another thread then the current
|
||||
* thread becomes disabled for thread scheduling purposes and
|
||||
* lies dormant until the write lock has been acquired.
|
||||
*
|
||||
* @param resource on which the lock has to be acquired
|
||||
*/
|
||||
public void writeLock(final R resource) {
|
||||
acquire(resource, ActiveLock::writeLock);
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases the write lock on given resource.
|
||||
*
|
||||
* @param resource for which the lock has to be released
|
||||
* @throws IllegalMonitorStateException if the current thread does not
|
||||
* hold this lock
|
||||
*/
|
||||
public void writeUnlock(final R resource)
|
||||
throws IllegalMonitorStateException {
|
||||
release(resource, ActiveLock::writeUnlock);
|
||||
}
|
||||
|
||||
/**
|
||||
* Acquires the lock on given resource using the provided lock function.
|
||||
*
|
||||
* @param resource on which the lock has to be acquired
|
||||
* @param lockFn function to acquire the lock
|
||||
*/
|
||||
private void acquire(final R resource, final Consumer<ActiveLock> lockFn) {
|
||||
lockFn.accept(getLockForLocking(resource));
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases the lock on given resource using the provided release function.
|
||||
*
|
||||
* @param resource for which the lock has to be released
|
||||
* @param releaseFn function to release the lock
|
||||
*/
|
||||
private void release(final R resource, final Consumer<ActiveLock> releaseFn) {
|
||||
final ActiveLock lock = getLockForReleasing(resource);
|
||||
releaseFn.accept(lock);
|
||||
decrementActiveLockCount(resource);
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns {@link ActiveLock} instance for the given resource,
|
||||
* on which the lock can be acquired.
|
||||
*
|
||||
* @param resource on which the lock has to be acquired
|
||||
* @return {@link ActiveLock} instance
|
||||
*/
|
||||
private ActiveLock getLockForLocking(final R resource) {
|
||||
/*
|
||||
* While getting a lock object for locking we should
|
||||
* atomically increment the active count of the lock.
|
||||
*
|
||||
* This is to avoid cases where the selected lock could
|
||||
* be removed from the activeLocks map and returned to
|
||||
* the object pool.
|
||||
*/
|
||||
return activeLocks.compute(resource, (k, v) -> {
|
||||
final ActiveLock lock;
|
||||
try {
|
||||
if (v == null) {
|
||||
lock = lockPool.borrowObject();
|
||||
|
@ -73,22 +187,34 @@ public class LockManager<T> {
|
|||
throw new RuntimeException(ex);
|
||||
}
|
||||
return lock;
|
||||
}).lock();
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Releases the lock on given resource.
|
||||
* Returns {@link ActiveLock} instance for the given resource,
|
||||
* for which the lock has to be released.
|
||||
*
|
||||
* @param resource for which the lock has to be released
|
||||
* @return {@link ActiveLock} instance
|
||||
*/
|
||||
public void unlock(T resource) {
|
||||
ActiveLock lock = activeLocks.get(resource);
|
||||
if (lock == null) {
|
||||
// Someone is releasing a lock which was never acquired. Log and return.
|
||||
LOG.error("Trying to release the lock on {}, which was never acquired.",
|
||||
resource);
|
||||
throw new IllegalMonitorStateException("Releasing lock on resource "
|
||||
+ resource + " without acquiring lock");
|
||||
private ActiveLock getLockForReleasing(final R resource) {
|
||||
if (activeLocks.containsKey(resource)) {
|
||||
return activeLocks.get(resource);
|
||||
}
|
||||
lock.unlock();
|
||||
// Someone is releasing a lock which was never acquired.
|
||||
LOG.error("Trying to release the lock on {}, which was never acquired.",
|
||||
resource);
|
||||
throw new IllegalMonitorStateException("Releasing lock on resource "
|
||||
+ resource + " without acquiring lock");
|
||||
}
|
||||
|
||||
/**
|
||||
* Decrements the active lock count and returns the {@link ActiveLock}
|
||||
* object to pool if the active count is 0.
|
||||
*
|
||||
* @param resource resource to which the ActiveLock is associated
|
||||
*/
|
||||
private void decrementActiveLockCount(final R resource) {
|
||||
activeLocks.computeIfPresent(resource, (k, v) -> {
|
||||
v.decrementActiveCount();
|
||||
if (v.getActiveLockCount() != 0) {
|
||||
|
|
|
@ -29,34 +29,143 @@ import java.util.concurrent.atomic.AtomicBoolean;
|
|||
public class TestLockManager {
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void testWithDifferentResource() {
|
||||
LockManager<String> manager = new LockManager<>(new OzoneConfiguration());
|
||||
manager.lock("/resourceOne");
|
||||
public void testWriteLockWithDifferentResource() {
|
||||
final LockManager<String> manager =
|
||||
new LockManager<>(new OzoneConfiguration());
|
||||
manager.writeLock("/resourceOne");
|
||||
// This should work, as they are different resource.
|
||||
manager.lock("/resourceTwo");
|
||||
manager.unlock("/resourceOne");
|
||||
manager.unlock("/resourceTwo");
|
||||
manager.writeLock("/resourceTwo");
|
||||
manager.writeUnlock("/resourceOne");
|
||||
manager.writeUnlock("/resourceTwo");
|
||||
Assert.assertTrue(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWithSameResource() throws Exception {
|
||||
LockManager<String> manager = new LockManager<>(new OzoneConfiguration());
|
||||
manager.lock("/resourceOne");
|
||||
AtomicBoolean gotLock = new AtomicBoolean(false);
|
||||
public void testWriteLockWithSameResource() throws Exception {
|
||||
final LockManager<String> manager =
|
||||
new LockManager<>(new OzoneConfiguration());
|
||||
final AtomicBoolean gotLock = new AtomicBoolean(false);
|
||||
manager.writeLock("/resourceOne");
|
||||
new Thread(() -> {
|
||||
manager.lock("/resourceOne");
|
||||
manager.writeLock("/resourceOne");
|
||||
gotLock.set(true);
|
||||
manager.unlock("/resourceOne");
|
||||
manager.writeUnlock("/resourceOne");
|
||||
}).start();
|
||||
// Let's give some time for the new thread to run
|
||||
// Let's give some time for the other thread to run
|
||||
Thread.sleep(100);
|
||||
// Since the new thread is trying to get lock on same object, it will wait.
|
||||
// Since the other thread is trying to get write lock on same object,
|
||||
// it will wait.
|
||||
Assert.assertFalse(gotLock.get());
|
||||
manager.unlock("/resourceOne");
|
||||
// Since we have released the lock, the new thread should have the lock
|
||||
// now
|
||||
// Let's give some time for the new thread to run
|
||||
manager.writeUnlock("/resourceOne");
|
||||
// Since we have released the write lock, the other thread should have
|
||||
// the lock now
|
||||
// Let's give some time for the other thread to run
|
||||
Thread.sleep(100);
|
||||
Assert.assertTrue(gotLock.get());
|
||||
}
|
||||
|
||||
@Test(timeout = 1000)
|
||||
public void testReadLockWithDifferentResource() {
|
||||
final LockManager<String> manager =
|
||||
new LockManager<>(new OzoneConfiguration());
|
||||
manager.readLock("/resourceOne");
|
||||
manager.readLock("/resourceTwo");
|
||||
manager.readUnlock("/resourceOne");
|
||||
manager.readUnlock("/resourceTwo");
|
||||
Assert.assertTrue(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadLockWithSameResource() throws Exception {
|
||||
final LockManager<String> manager =
|
||||
new LockManager<>(new OzoneConfiguration());
|
||||
final AtomicBoolean gotLock = new AtomicBoolean(false);
|
||||
manager.readLock("/resourceOne");
|
||||
new Thread(() -> {
|
||||
manager.readLock("/resourceOne");
|
||||
gotLock.set(true);
|
||||
manager.readUnlock("/resourceOne");
|
||||
}).start();
|
||||
// Let's give some time for the other thread to run
|
||||
Thread.sleep(100);
|
||||
// Since the new thread is trying to get read lock, it should work.
|
||||
Assert.assertTrue(gotLock.get());
|
||||
manager.readUnlock("/resourceOne");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWriteReadLockWithSameResource() throws Exception {
|
||||
final LockManager<String> manager =
|
||||
new LockManager<>(new OzoneConfiguration());
|
||||
final AtomicBoolean gotLock = new AtomicBoolean(false);
|
||||
manager.writeLock("/resourceOne");
|
||||
new Thread(() -> {
|
||||
manager.readLock("/resourceOne");
|
||||
gotLock.set(true);
|
||||
manager.readUnlock("/resourceOne");
|
||||
}).start();
|
||||
// Let's give some time for the other thread to run
|
||||
Thread.sleep(100);
|
||||
// Since the other thread is trying to get read lock on same object,
|
||||
// it will wait.
|
||||
Assert.assertFalse(gotLock.get());
|
||||
manager.writeUnlock("/resourceOne");
|
||||
// Since we have released the write lock, the other thread should have
|
||||
// the lock now
|
||||
// Let's give some time for the other thread to run
|
||||
Thread.sleep(100);
|
||||
Assert.assertTrue(gotLock.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReadWriteLockWithSameResource() throws Exception {
|
||||
final LockManager<String> manager =
|
||||
new LockManager<>(new OzoneConfiguration());
|
||||
final AtomicBoolean gotLock = new AtomicBoolean(false);
|
||||
manager.readLock("/resourceOne");
|
||||
new Thread(() -> {
|
||||
manager.writeLock("/resourceOne");
|
||||
gotLock.set(true);
|
||||
manager.writeUnlock("/resourceOne");
|
||||
}).start();
|
||||
// Let's give some time for the other thread to run
|
||||
Thread.sleep(100);
|
||||
// Since the other thread is trying to get write lock on same object,
|
||||
// it will wait.
|
||||
Assert.assertFalse(gotLock.get());
|
||||
manager.readUnlock("/resourceOne");
|
||||
// Since we have released the read lock, the other thread should have
|
||||
// the lock now
|
||||
// Let's give some time for the other thread to run
|
||||
Thread.sleep(100);
|
||||
Assert.assertTrue(gotLock.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMultiReadWriteLockWithSameResource() throws Exception {
|
||||
final LockManager<String> manager =
|
||||
new LockManager<>(new OzoneConfiguration());
|
||||
final AtomicBoolean gotLock = new AtomicBoolean(false);
|
||||
manager.readLock("/resourceOne");
|
||||
manager.readLock("/resourceOne");
|
||||
new Thread(() -> {
|
||||
manager.writeLock("/resourceOne");
|
||||
gotLock.set(true);
|
||||
manager.writeUnlock("/resourceOne");
|
||||
}).start();
|
||||
// Let's give some time for the other thread to run
|
||||
Thread.sleep(100);
|
||||
// Since the other thread is trying to get write lock on same object,
|
||||
// it will wait.
|
||||
Assert.assertFalse(gotLock.get());
|
||||
manager.readUnlock("/resourceOne");
|
||||
//We have only released one read lock, we still hold another read lock.
|
||||
Thread.sleep(100);
|
||||
Assert.assertFalse(gotLock.get());
|
||||
manager.readUnlock("/resourceOne");
|
||||
// Since we have released the read lock, the other thread should have
|
||||
// the lock now
|
||||
// Let's give some time for the other thread to run
|
||||
Thread.sleep(100);
|
||||
Assert.assertTrue(gotLock.get());
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue