JAVA-24659 Simplifying Logic of Consumer-Producer problem. (#14682)

* JAVA-24659 | moving runflag inside consumer and producer. it makes no sense to have it inside

* JAVA-24659 | using atomic integer for sequence, and moving thread.sleep() in the same place

* JAVA-24659 | enhanced logic for simplicity
This commit is contained in:
Gaetano Piazzolla 2023-09-08 15:44:38 +02:00 committed by GitHub
parent 071b34e376
commit cf1a561443
4 changed files with 78 additions and 83 deletions

View File

@ -4,6 +4,7 @@ import java.util.logging.Logger;
public class Consumer implements Runnable {
private static final Logger log = Logger.getLogger(Consumer.class.getCanonicalName());
private boolean running = false;
private final DataQueue dataQueue;
public Consumer(DataQueue dataQueue) {
@ -12,26 +13,36 @@ public class Consumer implements Runnable {
@Override
public void run() {
running = true;
consume();
}
public void stop() {
running = false;
}
public void consume() {
while (dataQueue.runFlag) {
while (dataQueue.isEmpty() && dataQueue.runFlag) {
while (running) {
if (dataQueue.isEmpty()) {
try {
dataQueue.waitOnEmpty();
dataQueue.waitIsNotEmpty();
} catch (InterruptedException e) {
e.printStackTrace();
log.severe("Error while waiting to Consume messages.");
break;
}
}
if (!dataQueue.runFlag) {
// avoid spurious wake-up
if (!running) {
break;
}
Message message = dataQueue.remove();
dataQueue.notifyAllForFull();
Message message = dataQueue.poll();
useMessage(message);
//Sleeping on random time to make it realistic
ThreadUtil.sleep((long) (Math.random() * 100));
}
log.info("Consumer Stopped");
}
@ -40,14 +51,7 @@ public class Consumer implements Runnable {
if (message != null) {
log.info(String.format("[%s] Consuming Message. Id: %d, Data: %f%n",
Thread.currentThread().getName(), message.getId(), message.getData()));
//Sleeping on random time to make it realistic
ThreadUtil.sleep((long) (message.getData() * 100));
}
}
public void stop() {
dataQueue.runFlag = false;
dataQueue.notifyAllForEmpty();
}
}

View File

@ -6,10 +6,8 @@ import java.util.Queue;
public class DataQueue {
private final Queue<Message> queue = new LinkedList<>();
private final int maxSize;
private final Object FULL_QUEUE = new Object();
private final Object EMPTY_QUEUE = new Object();
public boolean runFlag = true;
private final Object IS_NOT_FULL = new Object();
private final Object IS_NOT_EMPTY = new Object();
DataQueue(int maxSize) {
this.maxSize = maxSize;
@ -23,43 +21,42 @@ public class DataQueue {
return queue.isEmpty();
}
public void waitOnFull() throws InterruptedException {
synchronized (FULL_QUEUE) {
FULL_QUEUE.wait();
public void waitIsNotFull() throws InterruptedException {
synchronized (IS_NOT_FULL) {
IS_NOT_FULL.wait();
}
}
public void waitOnEmpty() throws InterruptedException {
synchronized (EMPTY_QUEUE) {
EMPTY_QUEUE.wait();
}
}
public void notifyAllForFull() {
synchronized (FULL_QUEUE) {
FULL_QUEUE.notifyAll();
}
}
public void notifyAllForEmpty() {
synchronized (EMPTY_QUEUE) {
EMPTY_QUEUE.notifyAll();
public void waitIsNotEmpty() throws InterruptedException {
synchronized (IS_NOT_EMPTY) {
IS_NOT_EMPTY.wait();
}
}
public void add(Message message) {
synchronized (queue) {
queue.add(message);
}
queue.add(message);
notifyIsNotEmpty();
}
public Message remove() {
synchronized (queue) {
return queue.poll();
}
public Message poll() {
Message mess = queue.poll();
notifyIsNotFull();
return mess;
}
public Integer getSize() {
return queue.size();
}
private void notifyIsNotFull() {
synchronized (IS_NOT_FULL) {
IS_NOT_FULL.notify();
}
}
private void notifyIsNotEmpty() {
synchronized (IS_NOT_EMPTY) {
IS_NOT_EMPTY.notify();
}
}
}

View File

@ -1,13 +1,13 @@
package com.baeldung.producerconsumer;
import java.util.concurrent.locks.ReentrantLock;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;
public class Producer implements Runnable {
private static final Logger log = Logger.getLogger(Producer.class.getCanonicalName());
private static final AtomicInteger idSequence = new AtomicInteger(0);
private boolean running = false;
private final DataQueue dataQueue;
private static int idSequence = 0;
final ReentrantLock lock = new ReentrantLock();
public Producer(DataQueue dataQueue) {
this.dataQueue = dataQueue;
@ -15,39 +15,36 @@ public class Producer implements Runnable {
@Override
public void run() {
running = true;
produce();
}
public void stop() {
running = false;
}
public void produce() {
while (dataQueue.runFlag) {
try {
lock.lock();
while (running) {
while (dataQueue.isFull() && dataQueue.runFlag) {
try {
dataQueue.waitOnFull();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
if (!dataQueue.runFlag) {
if (dataQueue.isFull()) {
try {
dataQueue.waitIsNotFull();
} catch (InterruptedException e) {
log.severe("Error while waiting to Produce messages.");
break;
}
Message message = generateMessage();
dataQueue.add(message);
dataQueue.notifyAllForEmpty();
log.info("Size of the queue is: " + dataQueue.getSize());
}
finally{
lock.unlock();
// avoid spurious wake-up
if (!running) {
break;
}
dataQueue.add(generateMessage());
log.info("Size of the queue is: " + dataQueue.getSize());
//Sleeping on random time to make it realistic
ThreadUtil.sleep((long) (Math.random() * 100));
}
@ -56,19 +53,11 @@ public class Producer implements Runnable {
}
private Message generateMessage() {
Message message = new Message(incrementAndGetId(), Math.random());
Message message = new Message(idSequence.incrementAndGet(), Math.random());
log.info(String.format("[%s] Generated Message. Id: %d, Data: %f%n",
Thread.currentThread().getName(), message.getId(), message.getData()));
return message;
}
private static int incrementAndGetId() {
return ++idSequence;
}
public void stop() {
dataQueue.runFlag = false;
dataQueue.notifyAllForFull();
}
}

View File

@ -27,7 +27,7 @@ public class ProducerConsumerDemonstrator {
// let threads run for two seconds
sleep(2000);
// Stop threads
// stop threads
producer.stop();
consumer.stop();
@ -39,26 +39,31 @@ public class ProducerConsumerDemonstrator {
int producerCount = 5;
int consumerCount = 5;
List<Thread> threads = new ArrayList<>();
Producer producer = new Producer(dataQueue);
List<Producer> producers = new ArrayList<>();
List<Consumer> consumers = new ArrayList<>();
for(int i = 0; i < producerCount; i++) {
Producer producer = new Producer(dataQueue);
Thread producerThread = new Thread(producer);
producerThread.start();
threads.add(producerThread);
producers.add(producer);
}
Consumer consumer = new Consumer(dataQueue);
for(int i = 0; i < consumerCount; i++) {
Consumer consumer = new Consumer(dataQueue);
Thread consumerThread = new Thread(consumer);
consumerThread.start();
threads.add(consumerThread);
consumers.add(consumer);
}
// let threads run for ten seconds
sleep(10000);
// Stop threads
producer.stop();
consumer.stop();
// stop threads
consumers.forEach(Consumer::stop);
producers.forEach(Producer::stop);
waitForAllThreadsToComplete(threads);
}