From 2c0494dd03a08ecfb6bcd7657c8de4d915f986c7 Mon Sep 17 00:00:00 2001 From: Tomasz Lelek Date: Sat, 28 Jan 2017 10:45:57 +0100 Subject: [PATCH] BAEL-614 introduced poison pill message to stop our producers form run indefinitely --- .../concurrent/blockingqueue/BlockingQueueUsage.java | 6 ++++-- .../concurrent/blockingqueue/NumbersConsumer.java | 7 ++++++- .../concurrent/blockingqueue/NumbersProducer.java | 9 ++++++++- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/BlockingQueueUsage.java b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/BlockingQueueUsage.java index 63c6cc4460..8c53ffd955 100644 --- a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/BlockingQueueUsage.java +++ b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/BlockingQueueUsage.java @@ -9,16 +9,18 @@ public class BlockingQueueUsage { int BOUND = 10; int N_PRODUCERS = 4; int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); + int poisonPill = Integer.MAX_VALUE; + int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; BlockingQueue queue = new LinkedBlockingQueue<>(BOUND); for (int i = 0; i < N_PRODUCERS; i++) { - new Thread(new NumbersProducer(queue)).start(); + new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start(); } for (int j = 0; j < N_CONSUMERS; j++) { - new Thread(new NumbersConsumer(queue)).start(); + new Thread(new NumbersConsumer(queue, poisonPill)).start(); } } } \ No newline at end of file diff --git a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersConsumer.java b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersConsumer.java index 110a801acc..44e3b805e5 100644 --- a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersConsumer.java +++ b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersConsumer.java @@ -5,15 +5,20 @@ import java.util.concurrent.BlockingQueue; class NumbersConsumer implements Runnable { private final BlockingQueue queue; + private final int poisonPill; - public NumbersConsumer(BlockingQueue queue) { + public NumbersConsumer(BlockingQueue queue, int poisonPill) { this.queue = queue; + this.poisonPill = poisonPill; } public void run() { try { while (true) { Integer number = queue.take(); + if (number.equals(poisonPill)) { + return; + } String result = number.toString(); System.out.println(Thread.currentThread().getName() + " result: " + result); diff --git a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java index c9e91cd077..65f869c2f4 100644 --- a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java +++ b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java @@ -8,10 +8,14 @@ import java.util.concurrent.BlockingQueue; class NumbersProducer implements Runnable { private final Random random = new Random(); private final BlockingQueue numbersQueue; + private final int poisonPill; + private final int poisonPillPerProducer; - public NumbersProducer(BlockingQueue numbersQueue) { + public NumbersProducer(BlockingQueue numbersQueue, int poisonPill, int poisonPillPerProducer) { this.numbersQueue = numbersQueue; + this.poisonPill = poisonPill; + this.poisonPillPerProducer = poisonPillPerProducer; } @@ -27,5 +31,8 @@ class NumbersProducer implements Runnable { for (int i = 0; i < 100; i++) { numbersQueue.put(random.nextInt(100)); } + for (int j = 0; j < poisonPillPerProducer; j++) { + numbersQueue.put(poisonPill); + } } } \ No newline at end of file