From 2c0494dd03a08ecfb6bcd7657c8de4d915f986c7 Mon Sep 17 00:00:00 2001 From: Tomasz Lelek Date: Sat, 28 Jan 2017 10:45:57 +0100 Subject: [PATCH 1/3] BAEL-614 introduced poison pill message to stop our producers form run indefinitely --- .../concurrent/blockingqueue/BlockingQueueUsage.java | 6 ++++-- .../concurrent/blockingqueue/NumbersConsumer.java | 7 ++++++- .../concurrent/blockingqueue/NumbersProducer.java | 9 ++++++++- 3 files changed, 18 insertions(+), 4 deletions(-) diff --git a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/BlockingQueueUsage.java b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/BlockingQueueUsage.java index 63c6cc4460..8c53ffd955 100644 --- a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/BlockingQueueUsage.java +++ b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/BlockingQueueUsage.java @@ -9,16 +9,18 @@ public class BlockingQueueUsage { int BOUND = 10; int N_PRODUCERS = 4; int N_CONSUMERS = Runtime.getRuntime().availableProcessors(); + int poisonPill = Integer.MAX_VALUE; + int poisonPillPerProducer = N_CONSUMERS / N_PRODUCERS; BlockingQueue queue = new LinkedBlockingQueue<>(BOUND); for (int i = 0; i < N_PRODUCERS; i++) { - new Thread(new NumbersProducer(queue)).start(); + new Thread(new NumbersProducer(queue, poisonPill, poisonPillPerProducer)).start(); } for (int j = 0; j < N_CONSUMERS; j++) { - new Thread(new NumbersConsumer(queue)).start(); + new Thread(new NumbersConsumer(queue, poisonPill)).start(); } } } \ No newline at end of file diff --git a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersConsumer.java b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersConsumer.java index 110a801acc..44e3b805e5 100644 --- a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersConsumer.java +++ b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersConsumer.java @@ -5,15 +5,20 @@ import java.util.concurrent.BlockingQueue; class NumbersConsumer implements Runnable { private final BlockingQueue queue; + private final int poisonPill; - public NumbersConsumer(BlockingQueue queue) { + public NumbersConsumer(BlockingQueue queue, int poisonPill) { this.queue = queue; + this.poisonPill = poisonPill; } public void run() { try { while (true) { Integer number = queue.take(); + if (number.equals(poisonPill)) { + return; + } String result = number.toString(); System.out.println(Thread.currentThread().getName() + " result: " + result); diff --git a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java index c9e91cd077..65f869c2f4 100644 --- a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java +++ b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java @@ -8,10 +8,14 @@ import java.util.concurrent.BlockingQueue; class NumbersProducer implements Runnable { private final Random random = new Random(); private final BlockingQueue numbersQueue; + private final int poisonPill; + private final int poisonPillPerProducer; - public NumbersProducer(BlockingQueue numbersQueue) { + public NumbersProducer(BlockingQueue numbersQueue, int poisonPill, int poisonPillPerProducer) { this.numbersQueue = numbersQueue; + this.poisonPill = poisonPill; + this.poisonPillPerProducer = poisonPillPerProducer; } @@ -27,5 +31,8 @@ class NumbersProducer implements Runnable { for (int i = 0; i < 100; i++) { numbersQueue.put(random.nextInt(100)); } + for (int j = 0; j < poisonPillPerProducer; j++) { + numbersQueue.put(poisonPill); + } } } \ No newline at end of file From 0639b2cd6f5fe0680449acb67189853336f7fcd6 Mon Sep 17 00:00:00 2001 From: Tomasz Lelek Date: Sat, 28 Jan 2017 10:49:35 +0100 Subject: [PATCH 2/3] BAEL-614 thread local random --- .../baeldung/concurrent/blockingqueue/NumbersProducer.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java index 65f869c2f4..9a5c9351ec 100644 --- a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java +++ b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java @@ -1,12 +1,11 @@ package com.baeldung.concurrent.blockingqueue; -import java.util.Random; import java.util.concurrent.BlockingQueue; +import java.util.concurrent.ThreadLocalRandom; class NumbersProducer implements Runnable { - private final Random random = new Random(); private final BlockingQueue numbersQueue; private final int poisonPill; private final int poisonPillPerProducer; @@ -29,7 +28,7 @@ class NumbersProducer implements Runnable { private void generateNumbers() throws InterruptedException { for (int i = 0; i < 100; i++) { - numbersQueue.put(random.nextInt(100)); + numbersQueue.put(ThreadLocalRandom.current().nextInt(100)); } for (int j = 0; j < poisonPillPerProducer; j++) { numbersQueue.put(poisonPill); From d38930100d24f803259017a4a2c78aa2b0f9643e Mon Sep 17 00:00:00 2001 From: Predrag Maric Date: Sun, 29 Jan 2017 11:55:53 +0100 Subject: [PATCH 3/3] BAEL-614 Fixed compilation error in NumbersProducer --- .../com/baeldung/concurrent/blockingqueue/NumbersProducer.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java index 0587728b92..b262097c63 100644 --- a/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java +++ b/core-java/src/main/java/com/baeldung/concurrent/blockingqueue/NumbersProducer.java @@ -5,8 +5,6 @@ import java.util.concurrent.ThreadLocalRandom; public class NumbersProducer implements Runnable { -class NumbersProducer implements Runnable { - private final BlockingQueue numbersQueue; private final int poisonPill; private final int poisonPillPerProducer;