diff --git a/core-java/src/main/java/com/baeldung/concurrent/locks/ReentrantLockWithCondition.java b/core-java/src/main/java/com/baeldung/concurrent/locks/ReentrantLockWithCondition.java new file mode 100644 index 0000000000..4f061d2efd --- /dev/null +++ b/core-java/src/main/java/com/baeldung/concurrent/locks/ReentrantLockWithCondition.java @@ -0,0 +1,83 @@ +package com.baeldung.concurrent.locks; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Stack; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.Condition; +import java.util.concurrent.locks.ReentrantLock; + +import static java.lang.Thread.sleep; + +public class ReentrantLockWithCondition { + + static Logger logger = LoggerFactory.getLogger(ReentrantLockWithCondition.class); + + Stack stack = new Stack<>(); + int CAPACITY = 5; + + ReentrantLock lock = new ReentrantLock(); + Condition stackEmptyCondition = lock.newCondition(); + Condition stackFullCondition = lock.newCondition(); + + public void pushToStack(String item) throws InterruptedException { + try { + lock.lock(); + if (stack.size() == CAPACITY) { + logger.info(Thread.currentThread().getName() + " wait on stack full"); + stackFullCondition.await(); + } + logger.info("Pushing the item " + item); + stack.push(item); + stackEmptyCondition.signalAll(); + } finally { + lock.unlock(); + } + + } + + public String popFromStack() throws InterruptedException { + try { + lock.lock(); + if (stack.size() == 0) { + logger.info(Thread.currentThread().getName() + " wait on stack empty"); + stackEmptyCondition.await(); + } + return stack.pop(); + } finally { + stackFullCondition.signalAll(); + lock.unlock(); + } + } + + public static void main(String[] args) { + final int threadCount = 2; + ReentrantLockWithCondition object = new ReentrantLockWithCondition(); + final ExecutorService service = Executors.newFixedThreadPool(threadCount); + service.execute(() -> { + for (int i = 0; i < 10; i++) { + try { + object.pushToStack("Item " + i); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + }); + + service.execute(() -> { + for (int i = 0; i < 10; i++) { + try { + logger.info("Item popped " + object.popFromStack()); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + }); + + service.shutdown(); + } +} diff --git a/core-java/src/main/java/com/baeldung/concurrent/locks/SharedObjectWithLock.java b/core-java/src/main/java/com/baeldung/concurrent/locks/SharedObjectWithLock.java new file mode 100644 index 0000000000..b6a4615638 --- /dev/null +++ b/core-java/src/main/java/com/baeldung/concurrent/locks/SharedObjectWithLock.java @@ -0,0 +1,92 @@ +package com.baeldung.concurrent.locks; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +import static java.lang.Thread.sleep; + +public class SharedObjectWithLock { + + Logger logger = LoggerFactory.getLogger(SharedObjectWithLock.class); + + ReentrantLock lock = new ReentrantLock(true); + + int counter = 0; + + public void perform() { + + lock.lock(); + logger.info("Thread - " + Thread.currentThread().getName() + " acquired the lock"); + try { + logger.info("Thread - " + Thread.currentThread().getName() + " processing"); + counter++; + } catch (Exception exception) { + logger.error(" Interrupted Exception ", exception); + } finally { + lock.unlock(); + logger.info("Thread - " + Thread.currentThread().getName() + " released the lock"); + } + } + + public void performTryLock() { + + logger.info("Thread - " + Thread.currentThread().getName() + " attempting to acquire the lock"); + try { + boolean isLockAcquired = lock.tryLock(2, TimeUnit.SECONDS); + if (isLockAcquired) { + try { + logger.info("Thread - " + Thread.currentThread().getName() + " acquired the lock"); + + logger.info("Thread - " + Thread.currentThread().getName() + " processing"); + sleep(1000); + } finally { + lock.unlock(); + logger.info("Thread - " + Thread.currentThread().getName() + " released the lock"); + + } + } + } catch (InterruptedException exception) { + logger.error(" Interrupted Exception ", exception); + } + logger.info("Thread - " + Thread.currentThread().getName() + " could not acquire the lock"); + } + + public ReentrantLock getLock() { + return lock; + } + + boolean isLocked() { + return lock.isLocked(); + } + + boolean hasQueuedThreads() { + return lock.hasQueuedThreads(); + } + + int getCounter() { + return counter; + } + + public static void main(String[] args) { + + final int threadCount = 2; + final ExecutorService service = Executors.newFixedThreadPool(threadCount); + final SharedObjectWithLock object = new SharedObjectWithLock(); + + service.execute(() -> { + object.perform(); + }); + service.execute(() -> { + object.performTryLock(); + }); + + service.shutdown(); + + } + +} diff --git a/core-java/src/main/java/com/baeldung/concurrent/locks/StampedLockDemo.java b/core-java/src/main/java/com/baeldung/concurrent/locks/StampedLockDemo.java new file mode 100644 index 0000000000..0b0dbc72cb --- /dev/null +++ b/core-java/src/main/java/com/baeldung/concurrent/locks/StampedLockDemo.java @@ -0,0 +1,104 @@ +package com.baeldung.concurrent.locks; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.StampedLock; + +import static java.lang.Thread.sleep; + +public class StampedLockDemo { + Map map = new HashMap<>(); + Logger logger = LoggerFactory.getLogger(StampedLockDemo.class); + + private final StampedLock lock = new StampedLock(); + + public void put(String key, String value) throws InterruptedException { + long stamp = lock.writeLock(); + + try { + logger.info(Thread.currentThread().getName() + " acquired the write lock with stamp " + stamp); + map.put(key, value); + } finally { + lock.unlockWrite(stamp); + logger.info(Thread.currentThread().getName() + " unlocked the write lock with stamp " + stamp); + } + } + + public String get(String key) throws InterruptedException { + long stamp = lock.readLock(); + logger.info(Thread.currentThread().getName() + " acquired the read lock with stamp " + stamp); + try { + sleep(5000); + return map.get(key); + + } finally { + lock.unlockRead(stamp); + logger.info(Thread.currentThread().getName() + " unlocked the read lock with stamp " + stamp); + + } + + } + + public String readWithOptimisticLock(String key) throws InterruptedException { + long stamp = lock.tryOptimisticRead(); + String value = map.get(key); + + if (!lock.validate(stamp)) { + stamp = lock.readLock(); + try { + sleep(5000); + return map.get(key); + + } finally { + lock.unlock(stamp); + logger.info(Thread.currentThread().getName() + " unlocked the read lock with stamp " + stamp); + + } + } + return value; + } + + public static void main(String[] args) { + final int threadCount = 4; + final ExecutorService service = Executors.newFixedThreadPool(threadCount); + StampedLockDemo object = new StampedLockDemo(); + + Runnable writeTask = () -> { + + try { + object.put("key1", "value1"); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }; + Runnable readTask = () -> { + + try { + object.get("key1"); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }; + Runnable readOptimisticTask = () -> { + + try { + object.readWithOptimisticLock("key1"); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }; + service.submit(writeTask); + service.submit(writeTask); + service.submit(readTask); + service.submit(readOptimisticTask); + + service.shutdown(); + + } + +} diff --git a/core-java/src/main/java/com/baeldung/concurrent/locks/SynchronizedHashMapWithRWLock.java b/core-java/src/main/java/com/baeldung/concurrent/locks/SynchronizedHashMapWithRWLock.java new file mode 100644 index 0000000000..83b8b34fe9 --- /dev/null +++ b/core-java/src/main/java/com/baeldung/concurrent/locks/SynchronizedHashMapWithRWLock.java @@ -0,0 +1,120 @@ +package com.baeldung.concurrent.locks; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import static java.lang.Thread.sleep; + +public class SynchronizedHashMapWithRWLock { + + static Map syncHashMap = new HashMap<>(); + Logger logger = LoggerFactory.getLogger(SynchronizedHashMapWithRWLock.class); + + private final ReadWriteLock lock = new ReentrantReadWriteLock(); + private final Lock readLock = lock.readLock(); + private final Lock writeLock = lock.writeLock(); + + public void put(String key, String value) throws InterruptedException { + + try { + writeLock.lock(); + logger.info(Thread.currentThread().getName() + " writing"); + syncHashMap.put(key, value); + sleep(1000); + } finally { + writeLock.unlock(); + } + + } + + public String get(String key) { + try { + readLock.lock(); + logger.info(Thread.currentThread().getName() + " reading"); + return syncHashMap.get(key); + } finally { + readLock.unlock(); + } + } + + public String remove(String key) { + try { + writeLock.lock(); + return syncHashMap.remove(key); + } finally { + writeLock.unlock(); + } + } + + public boolean containsKey(String key) { + try { + readLock.lock(); + return syncHashMap.containsKey(key); + } finally { + readLock.unlock(); + } + } + + boolean isReadLockAvailable() { + return readLock.tryLock(); + } + + public static void main(String[] args) throws InterruptedException { + + final int threadCount = 3; + final ExecutorService service = Executors.newFixedThreadPool(threadCount); + SynchronizedHashMapWithRWLock object = new SynchronizedHashMapWithRWLock(); + + service.execute(new Thread(new Writer(object), "Writer")); + service.execute(new Thread(new Reader(object), "Reader1")); + service.execute(new Thread(new Reader(object), "Reader2")); + + service.shutdown(); + } + + private static class Reader implements Runnable { + + SynchronizedHashMapWithRWLock object; + + public Reader(SynchronizedHashMapWithRWLock object) { + this.object = object; + } + + @Override + public void run() { + for (int i = 0; i < 10; i++) { + object.get("key" + i); + } + } + } + + private static class Writer implements Runnable { + + SynchronizedHashMapWithRWLock object; + + public Writer(SynchronizedHashMapWithRWLock object) { + this.object = object; + } + + @Override + public void run() { + for (int i = 0; i < 10; i++) { + try { + object.put("key" + i, "value" + i); + sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + } + +} diff --git a/core-java/src/test/java/com/baeldung/concurrent/locks/SharedObjectWithLockManualTest.java b/core-java/src/test/java/com/baeldung/concurrent/locks/SharedObjectWithLockManualTest.java new file mode 100644 index 0000000000..9b82ced642 --- /dev/null +++ b/core-java/src/test/java/com/baeldung/concurrent/locks/SharedObjectWithLockManualTest.java @@ -0,0 +1,75 @@ +package com.baeldung.concurrent.locks; + +import org.junit.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static junit.framework.TestCase.assertEquals; + +public class SharedObjectWithLockManualTest { + + @Test + public void whenLockAcquired_ThenLockedIsTrue() { + final SharedObjectWithLock object = new SharedObjectWithLock(); + + final int threadCount = 2; + final ExecutorService service = Executors.newFixedThreadPool(threadCount); + + executeThreads(object, threadCount, service); + + assertEquals(true, object.isLocked()); + + service.shutdown(); + } + + @Test + public void whenLocked_ThenQueuedThread() { + final int threadCount = 4; + final ExecutorService service = Executors.newFixedThreadPool(threadCount); + final SharedObjectWithLock object = new SharedObjectWithLock(); + + executeThreads(object, threadCount, service); + + assertEquals(object.hasQueuedThreads(), true); + + service.shutdown(); + + } + + public void whenTryLock_ThenQueuedThread() { + final SharedObjectWithLock object = new SharedObjectWithLock(); + + final int threadCount = 2; + final ExecutorService service = Executors.newFixedThreadPool(threadCount); + + executeThreads(object, threadCount, service); + + assertEquals(true, object.isLocked()); + + service.shutdown(); + } + + @Test + public void whenGetCount_ThenCorrectCount() throws InterruptedException { + final int threadCount = 4; + final ExecutorService service = Executors.newFixedThreadPool(threadCount); + final SharedObjectWithLock object = new SharedObjectWithLock(); + + executeThreads(object, threadCount, service); + Thread.sleep(1000); + assertEquals(object.getCounter(), 4); + + service.shutdown(); + + } + + private void executeThreads(SharedObjectWithLock object, int threadCount, ExecutorService service) { + for (int i = 0; i < threadCount; i++) { + service.execute(() -> { + object.perform(); + }); + } + } + +} diff --git a/core-java/src/test/java/com/baeldung/concurrent/locks/SynchronizedHashMapWithRWLockManualTest.java b/core-java/src/test/java/com/baeldung/concurrent/locks/SynchronizedHashMapWithRWLockManualTest.java new file mode 100644 index 0000000000..fd6cf08442 --- /dev/null +++ b/core-java/src/test/java/com/baeldung/concurrent/locks/SynchronizedHashMapWithRWLockManualTest.java @@ -0,0 +1,58 @@ +package com.baeldung.concurrent.locks; + +import jdk.nashorn.internal.ir.annotations.Ignore; +import org.junit.Test; + +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; + +import static junit.framework.TestCase.assertEquals; + +public class SynchronizedHashMapWithRWLockManualTest { + + @Test + public void whenWriting_ThenNoReading() { + SynchronizedHashMapWithRWLock object = new SynchronizedHashMapWithRWLock(); + final int threadCount = 3; + final ExecutorService service = Executors.newFixedThreadPool(threadCount); + + executeWriterThreads(object, threadCount, service); + + assertEquals(object.isReadLockAvailable(), false); + + service.shutdown(); + } + + @Test + public void whenReading_ThenMultipleReadingAllowed() { + SynchronizedHashMapWithRWLock object = new SynchronizedHashMapWithRWLock(); + final int threadCount = 5; + final ExecutorService service = Executors.newFixedThreadPool(threadCount); + + executeReaderThreads(object, threadCount, service); + + assertEquals(object.isReadLockAvailable(), true); + + service.shutdown(); + } + + private void executeWriterThreads(SynchronizedHashMapWithRWLock object, int threadCount, ExecutorService service) { + for (int i = 0; i < threadCount; i++) { + service.execute(() -> { + try { + object.put("key" + threadCount, "value" + threadCount); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + } + } + + private void executeReaderThreads(SynchronizedHashMapWithRWLock object, int threadCount, ExecutorService service) { + for (int i = 0; i < threadCount; i++) + service.execute(() -> { + object.get("key" + threadCount); + }); + } + +}