BAEL-4524 Acquire a Lock By Key in Java (#11872)
This commit is contained in:
parent
fee01d60fb
commit
c6bd572eda
@ -0,0 +1,43 @@
|
|||||||
|
package com.baeldung.lockbykey;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class shows examples of how you should use the lock
|
||||||
|
*
|
||||||
|
*/
|
||||||
|
public class ExampleUsage {
|
||||||
|
|
||||||
|
void doWithSimpleExclusiveLock(String key) {
|
||||||
|
SimpleExclusiveLockByKey simpleExclusiveLockByKey = new SimpleExclusiveLockByKey();
|
||||||
|
if (simpleExclusiveLockByKey.tryLock(key)) {
|
||||||
|
try {
|
||||||
|
// do stuff
|
||||||
|
} finally {
|
||||||
|
// it is very important to unlock in the finally block to avoid locking keys forever
|
||||||
|
simpleExclusiveLockByKey.unlock(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// A concrete example can be found in the unit tests
|
||||||
|
void doWithLock(String key) {
|
||||||
|
LockByKey lockByKey = new LockByKey();
|
||||||
|
lockByKey.lock(key);
|
||||||
|
try {
|
||||||
|
// do stuff
|
||||||
|
} finally {
|
||||||
|
lockByKey.unlock(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// It works exactly the same as with locks
|
||||||
|
void doWithSemaphore(String key) {
|
||||||
|
SimultaneousEntriesLockByKey lockByKey = new SimultaneousEntriesLockByKey();
|
||||||
|
lockByKey.lock(key);
|
||||||
|
try {
|
||||||
|
// do stuff
|
||||||
|
} finally {
|
||||||
|
lockByKey.unlock(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,41 @@
|
|||||||
|
package com.baeldung.lockbykey;
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
import java.util.concurrent.locks.Lock;
|
||||||
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
|
public class LockByKey {
|
||||||
|
|
||||||
|
private static class LockWrapper {
|
||||||
|
private final Lock lock = new ReentrantLock();
|
||||||
|
private final AtomicInteger numberOfThreadsInQueue = new AtomicInteger(1);
|
||||||
|
|
||||||
|
private LockWrapper addThreadInQueue() {
|
||||||
|
numberOfThreadsInQueue.incrementAndGet();
|
||||||
|
return this;
|
||||||
|
}
|
||||||
|
|
||||||
|
private int removeThreadFromQueue() {
|
||||||
|
return numberOfThreadsInQueue.decrementAndGet();
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static ConcurrentHashMap<String, LockWrapper> locks = new ConcurrentHashMap<String, LockWrapper>();
|
||||||
|
|
||||||
|
public void lock(String key) {
|
||||||
|
LockWrapper lockWrapper = locks.compute(key, (k, v) -> v == null ? new LockWrapper() : v.addThreadInQueue());
|
||||||
|
lockWrapper.lock.lock();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void unlock(String key) {
|
||||||
|
LockWrapper lockWrapper = locks.get(key);
|
||||||
|
lockWrapper.lock.unlock();
|
||||||
|
if (lockWrapper.removeThreadFromQueue() == 0) {
|
||||||
|
// NB : We pass in the specific value to remove to handle the case where another thread would queue right before the removal
|
||||||
|
locks.remove(key, lockWrapper);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,18 @@
|
|||||||
|
package com.baeldung.lockbykey;
|
||||||
|
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
public class SimpleExclusiveLockByKey {
|
||||||
|
|
||||||
|
private static Set<String> usedKeys= ConcurrentHashMap.newKeySet();
|
||||||
|
|
||||||
|
public boolean tryLock(String key) {
|
||||||
|
return usedKeys.add(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void unlock(String key) {
|
||||||
|
usedKeys.remove(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,25 @@
|
|||||||
|
package com.baeldung.lockbykey;
|
||||||
|
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
import java.util.concurrent.Semaphore;
|
||||||
|
|
||||||
|
public class SimultaneousEntriesLockByKey {
|
||||||
|
|
||||||
|
private static final int ALLOWED_THREADS = 2;
|
||||||
|
|
||||||
|
private static ConcurrentHashMap<String, Semaphore> semaphores = new ConcurrentHashMap<String, Semaphore>();
|
||||||
|
|
||||||
|
public void lock(String key) {
|
||||||
|
Semaphore semaphore = semaphores.compute(key, (k, v) -> v == null ? new Semaphore(ALLOWED_THREADS) : v);
|
||||||
|
semaphore.acquireUninterruptibly();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void unlock(String key) {
|
||||||
|
Semaphore semaphore = semaphores.get(key);
|
||||||
|
semaphore.release();
|
||||||
|
if (semaphore.availablePermits() == ALLOWED_THREADS) {
|
||||||
|
semaphores.remove(key, semaphore);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,106 @@
|
|||||||
|
package com.baeldung.lockbykey;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
public class LockByKeyUnitTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenNoLockedKey_WhenLock_ThenSuccess() throws InterruptedException {
|
||||||
|
AtomicBoolean threadWasExecuted = new AtomicBoolean(false);
|
||||||
|
Thread thread = new Thread(() -> {
|
||||||
|
String key = "key";
|
||||||
|
LockByKey lockByKey = new LockByKey();
|
||||||
|
lockByKey.lock(key);
|
||||||
|
try {
|
||||||
|
threadWasExecuted.set(true);
|
||||||
|
} finally {
|
||||||
|
lockByKey.unlock(key);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
thread.start();
|
||||||
|
Thread.sleep(100);
|
||||||
|
} finally {
|
||||||
|
assertTrue(threadWasExecuted.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenLockedKey_WhenLock_ThenFailure() throws InterruptedException {
|
||||||
|
String key = "key";
|
||||||
|
LockByKey lockByKey = new LockByKey();
|
||||||
|
lockByKey.lock(key);
|
||||||
|
AtomicBoolean anotherThreadWasExecuted = new AtomicBoolean(false);
|
||||||
|
Thread threadLockingOnAnotherKey = new Thread(() -> {
|
||||||
|
LockByKey otherLockByKey = new LockByKey();
|
||||||
|
otherLockByKey.lock(key);
|
||||||
|
try {
|
||||||
|
anotherThreadWasExecuted.set(true);
|
||||||
|
} finally {
|
||||||
|
otherLockByKey.unlock(key);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
threadLockingOnAnotherKey.start();
|
||||||
|
Thread.sleep(100);
|
||||||
|
} finally {
|
||||||
|
assertFalse(anotherThreadWasExecuted.get());
|
||||||
|
lockByKey.unlock(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenAnotherKeyLocked_WhenLock_ThenSuccess() throws InterruptedException {
|
||||||
|
String key = "key";
|
||||||
|
LockByKey lockByKey = new LockByKey();
|
||||||
|
lockByKey.lock(key);
|
||||||
|
AtomicBoolean anotherThreadWasExecuted = new AtomicBoolean(false);
|
||||||
|
Thread threadLockingOnAnotherKey = new Thread(() -> {
|
||||||
|
String anotherKey = "anotherKey";
|
||||||
|
LockByKey otherLockByKey = new LockByKey();
|
||||||
|
otherLockByKey.lock(anotherKey);
|
||||||
|
try {
|
||||||
|
anotherThreadWasExecuted.set(true);
|
||||||
|
} finally {
|
||||||
|
otherLockByKey.unlock(anotherKey);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
threadLockingOnAnotherKey.start();
|
||||||
|
Thread.sleep(100);
|
||||||
|
} finally {
|
||||||
|
assertTrue(anotherThreadWasExecuted.get());
|
||||||
|
lockByKey.unlock(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenUnlockedKey_WhenLock_ThenSuccess() throws InterruptedException {
|
||||||
|
String key = "key";
|
||||||
|
LockByKey lockByKey = new LockByKey();
|
||||||
|
lockByKey.lock(key);
|
||||||
|
AtomicBoolean anotherThreadWasExecuted = new AtomicBoolean(false);
|
||||||
|
Thread threadLockingOnAnotherKey = new Thread(() -> {
|
||||||
|
LockByKey otherLockByKey = new LockByKey();
|
||||||
|
otherLockByKey.lock(key);
|
||||||
|
try {
|
||||||
|
anotherThreadWasExecuted.set(true);
|
||||||
|
} finally {
|
||||||
|
otherLockByKey.unlock(key);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
lockByKey.unlock(key);
|
||||||
|
threadLockingOnAnotherKey.start();
|
||||||
|
Thread.sleep(100);
|
||||||
|
} finally {
|
||||||
|
assertTrue(anotherThreadWasExecuted.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,51 @@
|
|||||||
|
package com.baeldung.lockbykey;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
import java.lang.reflect.Field;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.BeforeEach;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
public class SimpleExclusiveLockByKeyUnitTest {
|
||||||
|
|
||||||
|
@BeforeEach
|
||||||
|
void cleanUpLocks() throws Exception {
|
||||||
|
Field field = SimpleExclusiveLockByKey.class.getDeclaredField("usedKeys");
|
||||||
|
field.setAccessible(true);
|
||||||
|
field.set(null, ConcurrentHashMap.newKeySet());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenNoLockedKey_WhenTryLock_ThenSuccess() {
|
||||||
|
SimpleExclusiveLockByKey lockByKey = new SimpleExclusiveLockByKey();
|
||||||
|
assertTrue(lockByKey.tryLock("key"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenLockedKey_WhenTryLock_ThenFailure() {
|
||||||
|
String key = "key";
|
||||||
|
SimpleExclusiveLockByKey lockByKey = new SimpleExclusiveLockByKey();
|
||||||
|
lockByKey.tryLock(key);
|
||||||
|
assertFalse(lockByKey.tryLock(key));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenAnotherKeyLocked_WhenTryLock_ThenSuccess() {
|
||||||
|
SimpleExclusiveLockByKey lockByKey = new SimpleExclusiveLockByKey();
|
||||||
|
lockByKey.tryLock("other");
|
||||||
|
assertTrue(lockByKey.tryLock("key"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenUnlockedKey_WhenTryLock_ThenSuccess() {
|
||||||
|
String key = "key";
|
||||||
|
SimpleExclusiveLockByKey lockByKey = new SimpleExclusiveLockByKey();
|
||||||
|
lockByKey.tryLock(key);
|
||||||
|
lockByKey.unlock(key);
|
||||||
|
assertTrue(lockByKey.tryLock(key));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -0,0 +1,146 @@
|
|||||||
|
package com.baeldung.lockbykey;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||||
|
|
||||||
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
|
||||||
|
public class SimultaneousEntriesLockByKeyUnitTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenNoKeyUsed_WhenLock_ThenSuccess() throws InterruptedException {
|
||||||
|
AtomicBoolean threadWasExecuted = new AtomicBoolean(false);
|
||||||
|
Thread thread = new Thread(() -> {
|
||||||
|
String key = "key";
|
||||||
|
SimultaneousEntriesLockByKey lockByKey = new SimultaneousEntriesLockByKey();
|
||||||
|
lockByKey.lock(key);
|
||||||
|
try {
|
||||||
|
threadWasExecuted.set(true);
|
||||||
|
} finally {
|
||||||
|
lockByKey.unlock(key);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
thread.start();
|
||||||
|
Thread.sleep(100);
|
||||||
|
} finally {
|
||||||
|
assertTrue(threadWasExecuted.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenKeyLockedWithRemainingPermits_WhenLock_ThenSuccess() throws InterruptedException {
|
||||||
|
String key = "key";
|
||||||
|
SimultaneousEntriesLockByKey lockByKey = new SimultaneousEntriesLockByKey();
|
||||||
|
lockByKey.lock(key);
|
||||||
|
AtomicBoolean anotherThreadWasExecuted = new AtomicBoolean(false);
|
||||||
|
Thread threadLockingOnAnotherKey = new Thread(() -> {
|
||||||
|
SimultaneousEntriesLockByKey otherLockByKeyWithSemaphore = new SimultaneousEntriesLockByKey();
|
||||||
|
otherLockByKeyWithSemaphore.lock(key);
|
||||||
|
try {
|
||||||
|
anotherThreadWasExecuted.set(true);
|
||||||
|
} finally {
|
||||||
|
otherLockByKeyWithSemaphore.unlock(key);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
threadLockingOnAnotherKey.start();
|
||||||
|
Thread.sleep(100);
|
||||||
|
} finally {
|
||||||
|
assertTrue(anotherThreadWasExecuted.get());
|
||||||
|
lockByKey.unlock(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenKeyLockedWithNoRemainingPermits_WhenLock_ThenFailure() throws InterruptedException {
|
||||||
|
String key = "key";
|
||||||
|
SimultaneousEntriesLockByKey lockByKey = new SimultaneousEntriesLockByKey();
|
||||||
|
lockByKey.lock(key);
|
||||||
|
AtomicBoolean anotherThreadWasExecuted = new AtomicBoolean(false);
|
||||||
|
Thread threadLockingOnAnotherKey1 = new Thread(() -> {
|
||||||
|
SimultaneousEntriesLockByKey otherLockByKeyWithSemaphore = new SimultaneousEntriesLockByKey();
|
||||||
|
otherLockByKeyWithSemaphore.lock(key);
|
||||||
|
try {
|
||||||
|
Thread.sleep(200); // make sure this thread will release the lock after the assertion
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
|
||||||
|
} finally {
|
||||||
|
otherLockByKeyWithSemaphore.unlock(key);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
Thread threadLockingOnAnotherKey2 = new Thread(() -> {
|
||||||
|
SimultaneousEntriesLockByKey otherLockByKey = new SimultaneousEntriesLockByKey();
|
||||||
|
try {
|
||||||
|
Thread.sleep(50); // make sure thread1 will acquire the key first
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
|
otherLockByKey.lock(key);
|
||||||
|
try {
|
||||||
|
anotherThreadWasExecuted.set(true);
|
||||||
|
} finally {
|
||||||
|
otherLockByKey.unlock(key);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
threadLockingOnAnotherKey1.start();
|
||||||
|
threadLockingOnAnotherKey2.start();
|
||||||
|
Thread.sleep(100);
|
||||||
|
} finally {
|
||||||
|
assertFalse(anotherThreadWasExecuted.get());
|
||||||
|
lockByKey.unlock(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenAnotherKeyLocked_WhenLock_ThenSuccess() throws InterruptedException {
|
||||||
|
String key = "key";
|
||||||
|
SimultaneousEntriesLockByKey lockByKey = new SimultaneousEntriesLockByKey();
|
||||||
|
lockByKey.lock(key);
|
||||||
|
AtomicBoolean anotherThreadWasExecuted = new AtomicBoolean(false);
|
||||||
|
Thread threadLockingOnAnotherKey = new Thread(() -> {
|
||||||
|
String anotherKey = "anotherKey";
|
||||||
|
SimultaneousEntriesLockByKey otherLockByKey = new SimultaneousEntriesLockByKey();
|
||||||
|
otherLockByKey.lock(anotherKey);
|
||||||
|
try {
|
||||||
|
anotherThreadWasExecuted.set(true);
|
||||||
|
} finally {
|
||||||
|
otherLockByKey.unlock(anotherKey);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
threadLockingOnAnotherKey.start();
|
||||||
|
Thread.sleep(100);
|
||||||
|
} finally {
|
||||||
|
assertTrue(anotherThreadWasExecuted.get());
|
||||||
|
lockByKey.unlock(key);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void givenUnlockedKey_WhenLock_ThenSuccess() throws InterruptedException {
|
||||||
|
String key = "key";
|
||||||
|
SimultaneousEntriesLockByKey lockByKey = new SimultaneousEntriesLockByKey();
|
||||||
|
lockByKey.lock(key);
|
||||||
|
AtomicBoolean anotherThreadWasExecuted = new AtomicBoolean(false);
|
||||||
|
Thread threadLockingOnAnotherKey = new Thread(() -> {
|
||||||
|
SimultaneousEntriesLockByKey otherLockByKey = new SimultaneousEntriesLockByKey();
|
||||||
|
otherLockByKey.lock(key);
|
||||||
|
try {
|
||||||
|
anotherThreadWasExecuted.set(true);
|
||||||
|
} finally {
|
||||||
|
otherLockByKey.unlock(key);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
try {
|
||||||
|
lockByKey.unlock(key);
|
||||||
|
threadLockingOnAnotherKey.start();
|
||||||
|
Thread.sleep(100);
|
||||||
|
} finally {
|
||||||
|
assertTrue(anotherThreadWasExecuted.get());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user