BAEL-845 transfer queue (#1676)

* BAEL-845 tranferqueue article

* BAEL-845 m to m example

* BAEL-845 move code to test

* BAEL-845 use tryTransfer

* BAEL-845 proper if logic
This commit is contained in:
Tomasz Lelek 2017-04-20 02:25:42 +02:00 committed by pedja4
parent aa08d1262b
commit e37ab29a42
3 changed files with 128 additions and 0 deletions

View File

@ -0,0 +1,33 @@
package com.baeldung.transferqueue;
import java.util.concurrent.TransferQueue;
public class Consumer implements Runnable {
private final TransferQueue<String> transferQueue;
private final String name;
private final int numberOfMessagesToConsume;
public Consumer(TransferQueue<String> transferQueue, String name, int numberOfMessagesToConsume) {
this.transferQueue = transferQueue;
this.name = name;
this.numberOfMessagesToConsume = numberOfMessagesToConsume;
}
@Override
public void run() {
for (int i = 0; i < numberOfMessagesToConsume; i++) {
try {
System.out.println("Consumer: " + name + " is waiting to take element...");
String element = transferQueue.take();
longProcessing(element);
System.out.println("Consumer: " + name + " received element: " + element);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void longProcessing(String element) throws InterruptedException {
Thread.sleep(1_000);
}
}

View File

@ -0,0 +1,33 @@
package com.baeldung.transferqueue;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TransferQueue;
public class Producer implements Runnable {
private final TransferQueue<String> transferQueue;
private final String name;
private final Integer numberOfMessagesToProduce;
public Producer(TransferQueue<String> transferQueue, String name, Integer numberOfMessagesToProduce) {
this.transferQueue = transferQueue;
this.name = name;
this.numberOfMessagesToProduce = numberOfMessagesToProduce;
}
@Override
public void run() {
for (int i = 0; i < numberOfMessagesToProduce; i++) {
try {
System.out.println("Producer: " + name + " is waiting to transfer...");
boolean added = transferQueue.tryTransfer("A" + i, 4000, TimeUnit.MILLISECONDS);
if (!added) {
System.out.println("can not add an element due to the timeout");
} else {
System.out.println("Producer: " + name + " transferred element: A" + i);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}

View File

@ -0,0 +1,62 @@
package com.baeldung.transferqueue;
import org.junit.Test;
import java.util.concurrent.*;
public class TransferQueueTest {
@Test
public void givenTransferQueue_whenUseMultipleConsumersAndMultipleProducers_thenShouldProcessAllMessages() throws InterruptedException {
//given
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(3);
Producer producer1 = new Producer(transferQueue, "1", 3);
Producer producer2 = new Producer(transferQueue, "2", 3);
Consumer consumer1 = new Consumer(transferQueue, "1", 3);
Consumer consumer2 = new Consumer(transferQueue, "2", 3);
//when
exService.execute(producer1);
exService.execute(producer2);
exService.execute(consumer1);
exService.execute(consumer2);
//then
exService.awaitTermination(10_000, TimeUnit.MILLISECONDS);
exService.shutdown();
}
@Test
public void givenTransferQueue_whenUseOneConsumerAndOneProducer_thenShouldProcessAllMessages() throws InterruptedException {
//given
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(2);
Producer producer = new Producer(transferQueue, "1", 3);
Consumer consumer = new Consumer(transferQueue, "1", 3);
//when
exService.execute(producer);
exService.execute(consumer);
//then
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
exService.shutdown();
}
@Test
public void givenTransferQueue_whenUseOneProducerAndNoConsumers_thenShouldFailWithTimeout() throws InterruptedException {
//given
TransferQueue<String> transferQueue = new LinkedTransferQueue<>();
ExecutorService exService = Executors.newFixedThreadPool(2);
Producer producer = new Producer(transferQueue, "1", 3);
//when
exService.execute(producer);
//then
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
exService.shutdown();
}
}