From 3c5aaa6b09c0db78b17681c5a1df3bde8456eab1 Mon Sep 17 00:00:00 2001 From: Seshu Kumar T Date: Tue, 15 Mar 2022 19:27:51 +0530 Subject: [PATCH] Producer consumer problem (#11781) * Producer consumer problem * Simplified producer consumer added Co-authored-by: Seshu Thanneeru --- .../baeldung/producerconsumer/Consumer.java | 51 ++++++++++++++ .../baeldung/producerconsumer/DataQueue.java | 59 ++++++++++++++++ .../baeldung/producerconsumer/Message.java | 27 ++++++++ .../baeldung/producerconsumer/Producer.java | 53 ++++++++++++++ .../ProducerConsumerDemonstrator.java | 69 +++++++++++++++++++ .../SimpleProducerConsumerDemonstrator.java | 60 ++++++++++++++++ .../baeldung/producerconsumer/ThreadUtil.java | 24 +++++++ 7 files changed, 343 insertions(+) create mode 100644 core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Consumer.java create mode 100644 core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/DataQueue.java create mode 100644 core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Message.java create mode 100644 core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Producer.java create mode 100644 core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/ProducerConsumerDemonstrator.java create mode 100644 core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/SimpleProducerConsumerDemonstrator.java create mode 100644 core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/ThreadUtil.java diff --git a/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Consumer.java b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Consumer.java new file mode 100644 index 0000000000..5a059b74df --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Consumer.java @@ -0,0 +1,51 @@ +package com.baeldung.producerconsumer; + +public class Consumer implements Runnable { + private final DataQueue dataQueue; + private volatile boolean runFlag; + + public Consumer(DataQueue dataQueue) { + this.dataQueue = dataQueue; + runFlag = true; + } + + @Override + public void run() { + consume(); + } + + public void consume() { + while (runFlag) { + Message message; + if (dataQueue.isEmpty()) { + try { + dataQueue.waitOnEmpty(); + } catch (InterruptedException e) { + e.printStackTrace(); + break; + } + } + if (!runFlag) { + break; + } + message = dataQueue.remove(); + dataQueue.notifyAllForFull(); + useMessage(message); + } + System.out.println("Consumer Stopped"); + } + + private void useMessage(Message message) { + if (message != null) { + System.out.printf("[%s] Consuming Message. Id: %d, Data: %f\n", Thread.currentThread().getName(), message.getId(), message.getData()); + + //Sleeping on random time to make it realistic + ThreadUtil.sleep((long) (message.getData() * 100)); + } + } + + public void stop() { + runFlag = false; + dataQueue.notifyAllForEmpty(); + } +} diff --git a/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/DataQueue.java b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/DataQueue.java new file mode 100644 index 0000000000..6ab4fa2bc3 --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/DataQueue.java @@ -0,0 +1,59 @@ +package com.baeldung.producerconsumer; + +import java.util.LinkedList; +import java.util.Queue; + +public class DataQueue { + private final Queue queue = new LinkedList<>(); + private final int maxSize; + private final Object FULL_QUEUE = new Object(); + private final Object EMPTY_QUEUE = new Object(); + + DataQueue(int maxSize) { + this.maxSize = maxSize; + } + + public boolean isFull() { + return queue.size() == maxSize; + } + + public boolean isEmpty() { + return queue.isEmpty(); + } + + public void waitOnFull() throws InterruptedException { + synchronized (FULL_QUEUE) { + FULL_QUEUE.wait(); + } + } + + public void waitOnEmpty() throws InterruptedException { + synchronized (EMPTY_QUEUE) { + EMPTY_QUEUE.wait(); + } + } + + public void notifyAllForFull() { + synchronized (FULL_QUEUE) { + FULL_QUEUE.notifyAll(); + } + } + + public void notifyAllForEmpty() { + synchronized (EMPTY_QUEUE) { + EMPTY_QUEUE.notifyAll(); + } + } + + public void add(Message message) { + synchronized (queue) { + queue.add(message); + } + } + + public Message remove() { + synchronized (queue) { + return queue.poll(); + } + } +} diff --git a/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Message.java b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Message.java new file mode 100644 index 0000000000..48f6e986df --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Message.java @@ -0,0 +1,27 @@ +package com.baeldung.producerconsumer; + +public class Message { + private int id; + private double data; + + public Message(int id, double data) { + this.id = id; + this.data = data; + } + + public int getId() { + return id; + } + + public void setId(int id) { + this.id = id; + } + + public double getData() { + return data; + } + + public void setData(double data) { + this.data = data; + } +} diff --git a/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Producer.java b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Producer.java new file mode 100644 index 0000000000..80d693bd97 --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/Producer.java @@ -0,0 +1,53 @@ +package com.baeldung.producerconsumer; + +public class Producer implements Runnable { + private final DataQueue dataQueue; + private volatile boolean runFlag; + + private static int idSequence = 0; + + public Producer(DataQueue dataQueue) { + this.dataQueue = dataQueue; + runFlag = true; + } + + @Override + public void run() { + produce(); + } + + public void produce() { + while (runFlag) { + Message message = generateMessage(); + while (dataQueue.isFull()) { + try { + dataQueue.waitOnFull(); + } catch (InterruptedException e) { + e.printStackTrace(); + break; + } + } + if (!runFlag) { + break; + } + dataQueue.add(message); + dataQueue.notifyAllForEmpty(); + } + System.out.println("Producer Stopped"); + } + + private Message generateMessage() { + Message message = new Message(++idSequence, Math.random()); + System.out.printf("[%s] Generated Message. Id: %d, Data: %f\n", Thread.currentThread().getName(), message.getId(), message.getData()); + + //Sleeping on random time to make it realistic + ThreadUtil.sleep((long) (message.getData() * 100)); + + return message; + } + + public void stop() { + runFlag = false; + dataQueue.notifyAllForFull(); + } +} diff --git a/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/ProducerConsumerDemonstrator.java b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/ProducerConsumerDemonstrator.java new file mode 100644 index 0000000000..96d7b9f865 --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/ProducerConsumerDemonstrator.java @@ -0,0 +1,69 @@ +package com.baeldung.producerconsumer; + +import java.util.ArrayList; +import java.util.List; + +import static com.baeldung.producerconsumer.ThreadUtil.*; + +public class ProducerConsumerDemonstrator { + private static final int MAX_QUEUE_CAPACITY = 5; + + public static void demoSingleProducerAndSingleConsumer() { + DataQueue dataQueue = new DataQueue(MAX_QUEUE_CAPACITY); + + Producer producer = new Producer(dataQueue); + Thread producerThread = new Thread(producer); + + Consumer consumer = new Consumer(dataQueue); + Thread consumerThread = new Thread(consumer); + + producerThread.start(); + consumerThread.start(); + + List threads = new ArrayList<>(); + threads.add(producerThread); + threads.add(consumerThread); + + // let threads run for two seconds + sleep(2000); + + // Stop threads + producer.stop(); + consumer.stop(); + + waitForAllThreadsToComplete(threads); + } + + public static void demoMultipleProducersAndMultipleConsumers() { + DataQueue dataQueue = new DataQueue(MAX_QUEUE_CAPACITY); + int producerCount = 3; + int consumerCount = 3; + List threads = new ArrayList<>(); + Producer producer = new Producer(dataQueue); + for(int i = 0; i < producerCount; i++) { + Thread producerThread = new Thread(producer); + producerThread.start(); + threads.add(producerThread); + } + Consumer consumer = new Consumer(dataQueue); + for(int i = 0; i < consumerCount; i++) { + Thread consumerThread = new Thread(consumer); + consumerThread.start(); + threads.add(consumerThread); + } + + // let threads run for two seconds + sleep(2000); + + // Stop threads + producer.stop(); + consumer.stop(); + + waitForAllThreadsToComplete(threads); + } + + public static void main(String[] args) { + demoSingleProducerAndSingleConsumer(); + demoMultipleProducersAndMultipleConsumers(); + } +} diff --git a/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/SimpleProducerConsumerDemonstrator.java b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/SimpleProducerConsumerDemonstrator.java new file mode 100644 index 0000000000..f1f6e1cc9c --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/SimpleProducerConsumerDemonstrator.java @@ -0,0 +1,60 @@ +package com.baeldung.producerconsumer; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingDeque; + +import static com.baeldung.producerconsumer.ThreadUtil.sleep; + +public class SimpleProducerConsumerDemonstrator { + BlockingQueue blockingQueue = new LinkedBlockingDeque<>(5); + + private void produce() { + while (true) { + double value = generateValue(); + try { + blockingQueue.put(value); + } catch (InterruptedException e) { + e.printStackTrace(); + break; + } + System.out.printf("[%s] Value produced: %f\n", Thread.currentThread().getName(), value); + } + } + + private void consume() { + while (true) { + Double value; + try { + value = blockingQueue.take(); + } catch (InterruptedException e) { + e.printStackTrace(); + break; + } + // Consume value + System.out.printf("[%s] Value consumed: %f\n", Thread.currentThread().getName(), value); + } + } + + private double generateValue() { + return Math.random(); + } + + private void runProducerConsumer() { + for (int i = 0; i < 2; i++) { + Thread producerThread = new Thread(this::produce); + producerThread.start(); + } + + for (int i = 0; i < 3; i++) { + Thread consumerThread = new Thread(this::consume); + consumerThread.start(); + } + } + + public static void main(String[] args) { + SimpleProducerConsumerDemonstrator simpleProducerConsumerDemonstrator = new SimpleProducerConsumerDemonstrator(); + simpleProducerConsumerDemonstrator.runProducerConsumer(); + sleep(2000); + System.exit(0); + } +} diff --git a/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/ThreadUtil.java b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/ThreadUtil.java new file mode 100644 index 0000000000..e49a9019fd --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-4/src/main/java/com/baeldung/producerconsumer/ThreadUtil.java @@ -0,0 +1,24 @@ +package com.baeldung.producerconsumer; + +import java.util.List; + +public class ThreadUtil { + public static void waitForAllThreadsToComplete(List threads) { + for(Thread thread: threads) { + try { + thread.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } + + public static void sleep(long interval) { + try { + // Wait for some time to demonstrate threads + Thread.sleep(interval); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +}