Producer consumer problem (#11781)

* Producer consumer problem

* Simplified producer consumer added

Co-authored-by: Seshu Thanneeru <seshukumar.thanneeru@thoughtdata.com>
This commit is contained in:
Seshu Kumar T 2022-03-15 19:27:51 +05:30 committed by GitHub
parent 7681cb2d65
commit 3c5aaa6b09
7 changed files with 343 additions and 0 deletions

View File

@ -0,0 +1,51 @@
package com.baeldung.producerconsumer;
public class Consumer implements Runnable {
private final DataQueue dataQueue;
private volatile boolean runFlag;
public Consumer(DataQueue dataQueue) {
this.dataQueue = dataQueue;
runFlag = true;
}
@Override
public void run() {
consume();
}
public void consume() {
while (runFlag) {
Message message;
if (dataQueue.isEmpty()) {
try {
dataQueue.waitOnEmpty();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
if (!runFlag) {
break;
}
message = dataQueue.remove();
dataQueue.notifyAllForFull();
useMessage(message);
}
System.out.println("Consumer Stopped");
}
private void useMessage(Message message) {
if (message != null) {
System.out.printf("[%s] Consuming Message. Id: %d, Data: %f\n", Thread.currentThread().getName(), message.getId(), message.getData());
//Sleeping on random time to make it realistic
ThreadUtil.sleep((long) (message.getData() * 100));
}
}
public void stop() {
runFlag = false;
dataQueue.notifyAllForEmpty();
}
}

View File

@ -0,0 +1,59 @@
package com.baeldung.producerconsumer;
import java.util.LinkedList;
import java.util.Queue;
public class DataQueue {
private final Queue<Message> queue = new LinkedList<>();
private final int maxSize;
private final Object FULL_QUEUE = new Object();
private final Object EMPTY_QUEUE = new Object();
DataQueue(int maxSize) {
this.maxSize = maxSize;
}
public boolean isFull() {
return queue.size() == maxSize;
}
public boolean isEmpty() {
return queue.isEmpty();
}
public void waitOnFull() throws InterruptedException {
synchronized (FULL_QUEUE) {
FULL_QUEUE.wait();
}
}
public void waitOnEmpty() throws InterruptedException {
synchronized (EMPTY_QUEUE) {
EMPTY_QUEUE.wait();
}
}
public void notifyAllForFull() {
synchronized (FULL_QUEUE) {
FULL_QUEUE.notifyAll();
}
}
public void notifyAllForEmpty() {
synchronized (EMPTY_QUEUE) {
EMPTY_QUEUE.notifyAll();
}
}
public void add(Message message) {
synchronized (queue) {
queue.add(message);
}
}
public Message remove() {
synchronized (queue) {
return queue.poll();
}
}
}

View File

@ -0,0 +1,27 @@
package com.baeldung.producerconsumer;
public class Message {
private int id;
private double data;
public Message(int id, double data) {
this.id = id;
this.data = data;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public double getData() {
return data;
}
public void setData(double data) {
this.data = data;
}
}

View File

@ -0,0 +1,53 @@
package com.baeldung.producerconsumer;
public class Producer implements Runnable {
private final DataQueue dataQueue;
private volatile boolean runFlag;
private static int idSequence = 0;
public Producer(DataQueue dataQueue) {
this.dataQueue = dataQueue;
runFlag = true;
}
@Override
public void run() {
produce();
}
public void produce() {
while (runFlag) {
Message message = generateMessage();
while (dataQueue.isFull()) {
try {
dataQueue.waitOnFull();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
if (!runFlag) {
break;
}
dataQueue.add(message);
dataQueue.notifyAllForEmpty();
}
System.out.println("Producer Stopped");
}
private Message generateMessage() {
Message message = new Message(++idSequence, Math.random());
System.out.printf("[%s] Generated Message. Id: %d, Data: %f\n", Thread.currentThread().getName(), message.getId(), message.getData());
//Sleeping on random time to make it realistic
ThreadUtil.sleep((long) (message.getData() * 100));
return message;
}
public void stop() {
runFlag = false;
dataQueue.notifyAllForFull();
}
}

View File

@ -0,0 +1,69 @@
package com.baeldung.producerconsumer;
import java.util.ArrayList;
import java.util.List;
import static com.baeldung.producerconsumer.ThreadUtil.*;
public class ProducerConsumerDemonstrator {
private static final int MAX_QUEUE_CAPACITY = 5;
public static void demoSingleProducerAndSingleConsumer() {
DataQueue dataQueue = new DataQueue(MAX_QUEUE_CAPACITY);
Producer producer = new Producer(dataQueue);
Thread producerThread = new Thread(producer);
Consumer consumer = new Consumer(dataQueue);
Thread consumerThread = new Thread(consumer);
producerThread.start();
consumerThread.start();
List<Thread> threads = new ArrayList<>();
threads.add(producerThread);
threads.add(consumerThread);
// let threads run for two seconds
sleep(2000);
// Stop threads
producer.stop();
consumer.stop();
waitForAllThreadsToComplete(threads);
}
public static void demoMultipleProducersAndMultipleConsumers() {
DataQueue dataQueue = new DataQueue(MAX_QUEUE_CAPACITY);
int producerCount = 3;
int consumerCount = 3;
List<Thread> threads = new ArrayList<>();
Producer producer = new Producer(dataQueue);
for(int i = 0; i < producerCount; i++) {
Thread producerThread = new Thread(producer);
producerThread.start();
threads.add(producerThread);
}
Consumer consumer = new Consumer(dataQueue);
for(int i = 0; i < consumerCount; i++) {
Thread consumerThread = new Thread(consumer);
consumerThread.start();
threads.add(consumerThread);
}
// let threads run for two seconds
sleep(2000);
// Stop threads
producer.stop();
consumer.stop();
waitForAllThreadsToComplete(threads);
}
public static void main(String[] args) {
demoSingleProducerAndSingleConsumer();
demoMultipleProducersAndMultipleConsumers();
}
}

View File

@ -0,0 +1,60 @@
package com.baeldung.producerconsumer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingDeque;
import static com.baeldung.producerconsumer.ThreadUtil.sleep;
public class SimpleProducerConsumerDemonstrator {
BlockingQueue<Double> blockingQueue = new LinkedBlockingDeque<>(5);
private void produce() {
while (true) {
double value = generateValue();
try {
blockingQueue.put(value);
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
System.out.printf("[%s] Value produced: %f\n", Thread.currentThread().getName(), value);
}
}
private void consume() {
while (true) {
Double value;
try {
value = blockingQueue.take();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
// Consume value
System.out.printf("[%s] Value consumed: %f\n", Thread.currentThread().getName(), value);
}
}
private double generateValue() {
return Math.random();
}
private void runProducerConsumer() {
for (int i = 0; i < 2; i++) {
Thread producerThread = new Thread(this::produce);
producerThread.start();
}
for (int i = 0; i < 3; i++) {
Thread consumerThread = new Thread(this::consume);
consumerThread.start();
}
}
public static void main(String[] args) {
SimpleProducerConsumerDemonstrator simpleProducerConsumerDemonstrator = new SimpleProducerConsumerDemonstrator();
simpleProducerConsumerDemonstrator.runProducerConsumer();
sleep(2000);
System.exit(0);
}
}

View File

@ -0,0 +1,24 @@
package com.baeldung.producerconsumer;
import java.util.List;
public class ThreadUtil {
public static void waitForAllThreadsToComplete(List<Thread> threads) {
for(Thread thread: threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public static void sleep(long interval) {
try {
// Wait for some time to demonstrate threads
Thread.sleep(interval);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}