From c70ee03a2485d6d32125477530f5179caf351405 Mon Sep 17 00:00:00 2001 From: Tomasz Lelek Date: Fri, 21 Apr 2017 00:31:35 +0200 Subject: [PATCH] BAEL-846 code for a syncrhonous queue article (#1699) * code for the unsafe article * more descriptive example * proper eng * better test name * free memory call * java 8 style * BAEL-814 Added call to freeMemory() in off-heap test * BAEL-846 code for a syncrhonous queue article * BAEL-814 Switched from Random to ThreadLocalRandom --- .../SynchronousQueueTest.java | 83 +++++++++++++++++++ 1 file changed, 83 insertions(+) create mode 100644 core-java/src/test/java/com/baeldung/synchronousqueue/SynchronousQueueTest.java diff --git a/core-java/src/test/java/com/baeldung/synchronousqueue/SynchronousQueueTest.java b/core-java/src/test/java/com/baeldung/synchronousqueue/SynchronousQueueTest.java new file mode 100644 index 0000000000..5d73e02e4f --- /dev/null +++ b/core-java/src/test/java/com/baeldung/synchronousqueue/SynchronousQueueTest.java @@ -0,0 +1,83 @@ +package com.baeldung.synchronousqueue; + +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; + +import static junit.framework.TestCase.assertEquals; + +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class SynchronousQueueTest { + + @Test + public void givenTwoThreads_whenWantToExchangeUsingLockGuardedVariable_thenItSucceed() throws InterruptedException { + //given + ExecutorService executor = Executors.newFixedThreadPool(2); + AtomicInteger sharedState = new AtomicInteger(); + CountDownLatch countDownLatch = new CountDownLatch(1); + + Runnable producer = () -> { + Integer producedElement = ThreadLocalRandom.current().nextInt(); + System.out.println("Saving an element: " + producedElement + " to the exchange point"); + sharedState.set(producedElement); + countDownLatch.countDown(); + }; + + Runnable consumer = () -> { + try { + countDownLatch.await(); + Integer consumedElement = sharedState.get(); + System.out.println("consumed an element: " + consumedElement + " from the exchange point"); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + }; + + //when + executor.execute(producer); + executor.execute(consumer); + + //then + executor.awaitTermination(500, TimeUnit.MILLISECONDS); + executor.shutdown(); + assertEquals(countDownLatch.getCount(), 0); + } + + @Test + public void givenTwoThreads_whenWantToExchangeUsingSynchronousQueue_thenItSucceed() throws InterruptedException { + //given + ExecutorService executor = Executors.newFixedThreadPool(2); + final SynchronousQueue queue = new SynchronousQueue<>(); + + Runnable producer = () -> { + Integer producedElement = ThreadLocalRandom.current().nextInt(); + try { + System.out.println("Saving an element: " + producedElement + " to the exchange point"); + queue.put(producedElement); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + }; + + Runnable consumer = () -> { + try { + Integer consumedElement = queue.take(); + System.out.println("consumed an element: " + consumedElement + " from the exchange point"); + } catch (InterruptedException ex) { + ex.printStackTrace(); + } + }; + + //when + executor.execute(producer); + executor.execute(consumer); + + //then + executor.awaitTermination(500, TimeUnit.MILLISECONDS); + executor.shutdown(); + assertEquals(queue.size(), 0); + } +} \ No newline at end of file