JAVA-23377 | fixing sync code block (#14574)

This commit is contained in:
Gaetano Piazzolla 2023-08-28 19:38:00 +02:00 committed by GitHub
parent c34697f899
commit 5b8de98ac3
3 changed files with 39 additions and 20 deletions

View File

@ -58,4 +58,8 @@ public class DataQueue {
return queue.poll(); return queue.poll();
} }
} }
public Integer getSize() {
return queue.size();
}
} }

View File

@ -1,12 +1,13 @@
package com.baeldung.producerconsumer; package com.baeldung.producerconsumer;
import java.util.concurrent.locks.ReentrantLock;
import java.util.logging.Logger; import java.util.logging.Logger;
public class Producer implements Runnable { public class Producer implements Runnable {
private static final Logger log = Logger.getLogger(Producer.class.getCanonicalName()); private static final Logger log = Logger.getLogger(Producer.class.getCanonicalName());
private final DataQueue dataQueue; private final DataQueue dataQueue;
private static int idSequence = 0; private static int idSequence = 0;
final ReentrantLock lock = new ReentrantLock();
public Producer(DataQueue dataQueue) { public Producer(DataQueue dataQueue) {
this.dataQueue = dataQueue; this.dataQueue = dataQueue;
@ -19,22 +20,38 @@ public class Producer implements Runnable {
public void produce() { public void produce() {
while (dataQueue.runFlag) { while (dataQueue.runFlag) {
while (dataQueue.isFull() && dataQueue.runFlag) {
try { try {
dataQueue.waitOnFull(); lock.lock();
} catch (InterruptedException e) {
e.printStackTrace(); while (dataQueue.isFull() && dataQueue.runFlag) {
try {
dataQueue.waitOnFull();
} catch (InterruptedException e) {
e.printStackTrace();
break;
}
}
if (!dataQueue.runFlag) {
break; break;
} }
}
if (!dataQueue.runFlag) {
break;
}
Message message = generateMessage();
dataQueue.add(message);
dataQueue.notifyAllForEmpty();
Message message = generateMessage();
dataQueue.add(message);
dataQueue.notifyAllForEmpty();
log.info("Size of the queue is: " + dataQueue.getSize());
}
finally{
lock.unlock();
}
//Sleeping on random time to make it realistic
ThreadUtil.sleep((long) (Math.random() * 100));
} }
log.info("Producer Stopped"); log.info("Producer Stopped");
} }
@ -43,9 +60,6 @@ public class Producer implements Runnable {
log.info(String.format("[%s] Generated Message. Id: %d, Data: %f%n", log.info(String.format("[%s] Generated Message. Id: %d, Data: %f%n",
Thread.currentThread().getName(), message.getId(), message.getData())); Thread.currentThread().getName(), message.getId(), message.getData()));
//Sleeping on random time to make it realistic
ThreadUtil.sleep((long) (message.getData() * 100));
return message; return message;
} }

View File

@ -36,8 +36,8 @@ public class ProducerConsumerDemonstrator {
public static void demoMultipleProducersAndMultipleConsumers() { public static void demoMultipleProducersAndMultipleConsumers() {
DataQueue dataQueue = new DataQueue(MAX_QUEUE_CAPACITY); DataQueue dataQueue = new DataQueue(MAX_QUEUE_CAPACITY);
int producerCount = 3; int producerCount = 5;
int consumerCount = 3; int consumerCount = 5;
List<Thread> threads = new ArrayList<>(); List<Thread> threads = new ArrayList<>();
Producer producer = new Producer(dataQueue); Producer producer = new Producer(dataQueue);
for(int i = 0; i < producerCount; i++) { for(int i = 0; i < producerCount; i++) {
@ -45,6 +45,7 @@ public class ProducerConsumerDemonstrator {
producerThread.start(); producerThread.start();
threads.add(producerThread); threads.add(producerThread);
} }
Consumer consumer = new Consumer(dataQueue); Consumer consumer = new Consumer(dataQueue);
for(int i = 0; i < consumerCount; i++) { for(int i = 0; i < consumerCount; i++) {
Thread consumerThread = new Thread(consumer); Thread consumerThread = new Thread(consumer);
@ -52,8 +53,8 @@ public class ProducerConsumerDemonstrator {
threads.add(consumerThread); threads.add(consumerThread);
} }
// let threads run for two seconds // let threads run for ten seconds
sleep(2000); sleep(10000);
// Stop threads // Stop threads
producer.stop(); producer.stop();