From bccc9c836e87bf20a6662c9c9d206949bed7bd04 Mon Sep 17 00:00:00 2001 From: Dhrubajyoti Bhattacharjee Date: Fri, 15 May 2020 22:17:08 +0200 Subject: [PATCH] BAEL-3858 Java Exchanger introduction (#9000) --- .../ExchangerPipeLineManualTest.java | 83 +++++++++++++++++++ .../baeldung/exchanger/ExchangerUnitTest.java | 63 ++++++++++++++ 2 files changed, 146 insertions(+) create mode 100644 core-java-modules/core-java-concurrency-advanced-3/src/test/java/com/baeldung/exchanger/ExchangerPipeLineManualTest.java create mode 100644 core-java-modules/core-java-concurrency-advanced-3/src/test/java/com/baeldung/exchanger/ExchangerUnitTest.java diff --git a/core-java-modules/core-java-concurrency-advanced-3/src/test/java/com/baeldung/exchanger/ExchangerPipeLineManualTest.java b/core-java-modules/core-java-concurrency-advanced-3/src/test/java/com/baeldung/exchanger/ExchangerPipeLineManualTest.java new file mode 100644 index 0000000000..093580654b --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-3/src/test/java/com/baeldung/exchanger/ExchangerPipeLineManualTest.java @@ -0,0 +1,83 @@ +package com.baeldung.exchanger; + +import java.util.Queue; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Exchanger; +import java.util.concurrent.ExecutionException; +import org.junit.Test; + +import static java.util.concurrent.CompletableFuture.runAsync; + + + +public class ExchangerPipeLineManualTest { + + private static final int BUFFER_SIZE = 100; + + @Test + public void givenData_whenPassedThrough_thenCorrect() throws InterruptedException, ExecutionException { + + Exchanger> readerExchanger = new Exchanger<>(); + Exchanger> writerExchanger = new Exchanger<>(); + int counter = 0; + + Runnable reader = () -> { + Queue readerBuffer = new ConcurrentLinkedQueue<>(); + while (true) { + readerBuffer.add(UUID.randomUUID().toString()); + if (readerBuffer.size() >= BUFFER_SIZE) { + try { + readerBuffer = readerExchanger.exchange(readerBuffer); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + } + }; + + Runnable processor = () -> { + Queue processorBuffer = new ConcurrentLinkedQueue<>(); + Queue writterBuffer = new ConcurrentLinkedQueue<>(); + try { + processorBuffer = readerExchanger.exchange(processorBuffer); + while (true) { + writterBuffer.add(processorBuffer.poll()); + if (processorBuffer.isEmpty()) { + try { + processorBuffer = readerExchanger.exchange(processorBuffer); + writterBuffer = writerExchanger.exchange(writterBuffer); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }; + + Runnable writer = () -> { + Queue writterBuffer = new ConcurrentLinkedQueue<>(); + try { + writterBuffer = writerExchanger.exchange(writterBuffer); + while (true) { + System.out.println(writterBuffer.poll()); + if (writterBuffer.isEmpty()) { + writterBuffer = writerExchanger.exchange(writterBuffer); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }; + + CompletableFuture.allOf(runAsync(reader), runAsync(processor), runAsync(writer)).get(); + } + +} diff --git a/core-java-modules/core-java-concurrency-advanced-3/src/test/java/com/baeldung/exchanger/ExchangerUnitTest.java b/core-java-modules/core-java-concurrency-advanced-3/src/test/java/com/baeldung/exchanger/ExchangerUnitTest.java new file mode 100644 index 0000000000..ec567a3563 --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-3/src/test/java/com/baeldung/exchanger/ExchangerUnitTest.java @@ -0,0 +1,63 @@ +package com.baeldung.exchanger; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Exchanger; + +import java.util.concurrent.ExecutionException; +import org.junit.Test; + +import static java.util.concurrent.CompletableFuture.runAsync; + +public class ExchangerUnitTest { + + + @Test + public void givenThreads_whenMessageExchanged_thenCorrect() { + Exchanger exchanger = new Exchanger<>(); + + Runnable taskA = () -> { + try { + String message = exchanger.exchange("from A"); + assertEquals("from B", message); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }; + + Runnable taskB = () -> { + try { + String message = exchanger.exchange("from B"); + assertEquals("from A", message); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }; + + CompletableFuture.allOf(runAsync(taskA), runAsync(taskB)).join(); + } + + @Test + public void givenThread_WhenExchangedMessage_thenCorrect() throws InterruptedException, ExecutionException { + Exchanger exchanger = new Exchanger<>(); + + Runnable runner = () -> { + try { + String message = exchanger.exchange("from runner"); + assertEquals("to runner", message); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + }; + + CompletableFuture result = CompletableFuture.runAsync(runner); + String msg = exchanger.exchange("to runner"); + assertEquals("from runner", msg); + result.join(); + } + +}