diff --git a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/BlockingQueueUsage.java b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/BlockingQueueUsage.java index 801dc4e35c..052136bfe4 100644 --- a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/BlockingQueueUsage.java +++ b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/BlockingQueueUsage.java @@ -8,15 +8,17 @@ public class BlockingQueueUsage { int BOUND = 10; int N_PRODUCERS = 4; int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); + int poisonPill = Integer.MAX_VALUE; + int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; BlockingQueue queue = new LinkedBlockingQueue<>(BOUND); for (int i = 0; i < N_PRODUCERS; i++) { - new Thread(new NumbersProducer(queue)).start(); + new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start(); } for (int j = 0; j < N_CONSUMERS; j++) { - new Thread(new NumbersConsumer(queue)).start(); + new Thread(new NumbersConsumer(queue, poisonPill)).start(); } } } \ No newline at end of file diff --git a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersConsumer.java b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersConsumer.java index 774c263a3b..13fe0c3126 100644 --- a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersConsumer.java +++ b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersConsumer.java @@ -4,15 +4,20 @@ import java.util.concurrent.BlockingQueue; public class NumbersConsumer implements Runnable { private final BlockingQueue queue; + private final int poisonPill; - public NumbersConsumer(BlockingQueue queue) { + public NumbersConsumer(BlockingQueue queue, int poisonPill) { this.queue = queue; + this.poisonPill = poisonPill; } public void run() { try { while (true) { Integer number = queue.take(); + if (number.equals(poisonPill)) { + return; + } String result = number.toString(); System.out.println(Thread.currentThread().getName() + " result: " + result); } diff --git a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java index eb927c8b37..b262097c63 100644 --- a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java +++ b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java @@ -6,9 +6,13 @@ import java.util.concurrent.ThreadLocalRandom; public class NumbersProducer implements Runnable { private final BlockingQueue numbersQueue; + private final int poisonPill; + private final int poisonPillPerProducer; - public NumbersProducer(BlockingQueue numbersQueue) { + public NumbersProducer(BlockingQueue numbersQueue, int poisonPill, int poisonPillPerProducer) { this.numbersQueue = numbersQueue; + this.poisonPill = poisonPill; + this.poisonPillPerProducer = poisonPillPerProducer; } public void run() { @@ -23,5 +27,8 @@ public class NumbersProducer implements Runnable { for (int i = 0; i < 100; i++) { numbersQueue.put(ThreadLocalRandom.current().nextInt(100)); } + for (int j = 0; j < poisonPillPerProducer; j++) { + numbersQueue.put(poisonPill); + } } } \ No newline at end of file