From 58f8e6163fc0bfc6c6ddbfb8d712fc930b11a0a4 Mon Sep 17 00:00:00 2001 From: ramansahasi Date: Fri, 1 Dec 2017 00:31:44 +0530 Subject: [PATCH] BAEL-1327 Java Threads: notify and wait (initial commit) (#3160) --- .../concurrent/waitandnotify/Data.java | 33 ++++++++++ .../waitandnotify/NetworkDriver.java | 12 ++++ .../concurrent/waitandnotify/Receiver.java | 25 +++++++ .../concurrent/waitandnotify/Sender.java | 30 +++++++++ .../waitandnotify/NetworkIntegrationTest.java | 65 +++++++++++++++++++ 5 files changed, 165 insertions(+) create mode 100644 core-java-concurrency/src/main/java/com/baeldung/concurrent/waitandnotify/Data.java create mode 100644 core-java-concurrency/src/main/java/com/baeldung/concurrent/waitandnotify/NetworkDriver.java create mode 100644 core-java-concurrency/src/main/java/com/baeldung/concurrent/waitandnotify/Receiver.java create mode 100644 core-java-concurrency/src/main/java/com/baeldung/concurrent/waitandnotify/Sender.java create mode 100644 core-java-concurrency/src/test/java/com/baeldung/concurrent/waitandnotify/NetworkIntegrationTest.java diff --git a/core-java-concurrency/src/main/java/com/baeldung/concurrent/waitandnotify/Data.java b/core-java-concurrency/src/main/java/com/baeldung/concurrent/waitandnotify/Data.java new file mode 100644 index 0000000000..9b850c4153 --- /dev/null +++ b/core-java-concurrency/src/main/java/com/baeldung/concurrent/waitandnotify/Data.java @@ -0,0 +1,33 @@ +package com.baeldung.concurrent.waitandnotify; + +public class Data { + private String packet; + + // True if receiver should wait + // False if sender should wait + private boolean transfer = true; + + public synchronized String receive() { + while (transfer) { + try { + wait(); + } catch (InterruptedException e) {} + } + transfer = true; + + notifyAll(); + return packet; + } + + public synchronized void send(String packet) { + while (!transfer) { + try { + wait(); + } catch (InterruptedException e) {} + } + transfer = false; + + this.packet = packet; + notifyAll(); + } +} \ No newline at end of file diff --git a/core-java-concurrency/src/main/java/com/baeldung/concurrent/waitandnotify/NetworkDriver.java b/core-java-concurrency/src/main/java/com/baeldung/concurrent/waitandnotify/NetworkDriver.java new file mode 100644 index 0000000000..d4fd1574c6 --- /dev/null +++ b/core-java-concurrency/src/main/java/com/baeldung/concurrent/waitandnotify/NetworkDriver.java @@ -0,0 +1,12 @@ +package com.baeldung.concurrent.waitandnotify; + +public class NetworkDriver { + public static void main(String[] args) { + Data data = new Data(); + Thread sender = new Thread(new Sender(data)); + Thread receiver = new Thread(new Receiver(data)); + + sender.start(); + receiver.start(); + } +} \ No newline at end of file diff --git a/core-java-concurrency/src/main/java/com/baeldung/concurrent/waitandnotify/Receiver.java b/core-java-concurrency/src/main/java/com/baeldung/concurrent/waitandnotify/Receiver.java new file mode 100644 index 0000000000..63f48b8031 --- /dev/null +++ b/core-java-concurrency/src/main/java/com/baeldung/concurrent/waitandnotify/Receiver.java @@ -0,0 +1,25 @@ +package com.baeldung.concurrent.waitandnotify; + +import java.util.concurrent.ThreadLocalRandom; + +public class Receiver implements Runnable { + private Data load; + + public Receiver(Data load) { + this.load = load; + } + + public void run() { + for(String receivedMessage = load.receive(); + !"End".equals(receivedMessage) ; + receivedMessage = load.receive()) { + + System.out.println(receivedMessage); + + //Thread.sleep() to mimic heavy server-side processing + try { + Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); + } catch (InterruptedException e) {} + } + } +} \ No newline at end of file diff --git a/core-java-concurrency/src/main/java/com/baeldung/concurrent/waitandnotify/Sender.java b/core-java-concurrency/src/main/java/com/baeldung/concurrent/waitandnotify/Sender.java new file mode 100644 index 0000000000..b7d782c3f5 --- /dev/null +++ b/core-java-concurrency/src/main/java/com/baeldung/concurrent/waitandnotify/Sender.java @@ -0,0 +1,30 @@ +package com.baeldung.concurrent.waitandnotify; + +import java.util.concurrent.ThreadLocalRandom; + +public class Sender implements Runnable { + private Data data; + + public Sender(Data data) { + this.data = data; + } + + public void run() { + String packets[] = { + "First packet", + "Second packet", + "Third packet", + "Fourth packet", + "End" + }; + + for (String packet : packets) { + data.send(packet); + + //Thread.sleep() to mimic heavy server-side processing + try { + Thread.sleep(ThreadLocalRandom.current().nextInt(1000, 5000)); + } catch (InterruptedException e) {} + } + } +} \ No newline at end of file diff --git a/core-java-concurrency/src/test/java/com/baeldung/concurrent/waitandnotify/NetworkIntegrationTest.java b/core-java-concurrency/src/test/java/com/baeldung/concurrent/waitandnotify/NetworkIntegrationTest.java new file mode 100644 index 0000000000..49f4313e9d --- /dev/null +++ b/core-java-concurrency/src/test/java/com/baeldung/concurrent/waitandnotify/NetworkIntegrationTest.java @@ -0,0 +1,65 @@ +package com.baeldung.concurrent.waitandnotify; + +import static org.junit.Assert.assertEquals; + +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; +import java.io.PrintWriter; +import java.io.StringWriter; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +public class NetworkIntegrationTest { + + private final ByteArrayOutputStream outContent = new ByteArrayOutputStream(); + private final ByteArrayOutputStream errContent = new ByteArrayOutputStream(); + private String expected; + + @Before + public void setUpStreams() { + System.setOut(new PrintStream(outContent)); + System.setErr(new PrintStream(errContent)); + } + + @Before + public void setUpExpectedOutput() { + StringWriter expectedStringWriter = new StringWriter(); + + PrintWriter printWriter = new PrintWriter(expectedStringWriter); + printWriter.println("First packet"); + printWriter.println("Second packet"); + printWriter.println("Third packet"); + printWriter.println("Fourth packet"); + printWriter.close(); + + expected = expectedStringWriter.toString(); + } + + @After + public void cleanUpStreams() { + System.setOut(null); + System.setErr(null); + } + + @Test + public void givenSenderAndReceiver_whenSendingPackets_thenNetworkSynchronized() { + Data data = new Data(); + Thread sender = new Thread(new Sender(data)); + Thread receiver = new Thread(new Receiver(data)); + + sender.start(); + receiver.start(); + + //wait for sender and receiver to finish before we test against expected + try { + sender.join(); + receiver.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + assertEquals(expected, outContent.toString()); + } +}