Bael 845 transfer queue (#1697)
* BAEL-845 tranferqueue article * BAEL-845 m to m example * BAEL-845 move code to test * BAEL-845 use tryTransfer * BAEL-845 proper if logic * BAEL-845 proper test * BAEL-845 robust test
This commit is contained in:
parent
76673a33f1
commit
2648a4ec25
@ -1,11 +1,13 @@
|
|||||||
package com.baeldung.transferqueue;
|
package com.baeldung.transferqueue;
|
||||||
|
|
||||||
import java.util.concurrent.TransferQueue;
|
import java.util.concurrent.TransferQueue;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
public class Consumer implements Runnable {
|
public class Consumer implements Runnable {
|
||||||
private final TransferQueue<String> transferQueue;
|
private final TransferQueue<String> transferQueue;
|
||||||
private final String name;
|
private final String name;
|
||||||
private final int numberOfMessagesToConsume;
|
private final int numberOfMessagesToConsume;
|
||||||
|
public final AtomicInteger numberOfConsumedMessages = new AtomicInteger();
|
||||||
|
|
||||||
public Consumer(TransferQueue<String> transferQueue, String name, int numberOfMessagesToConsume) {
|
public Consumer(TransferQueue<String> transferQueue, String name, int numberOfMessagesToConsume) {
|
||||||
this.transferQueue = transferQueue;
|
this.transferQueue = transferQueue;
|
||||||
@ -28,6 +30,7 @@ public class Consumer implements Runnable {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private void longProcessing(String element) throws InterruptedException {
|
private void longProcessing(String element) throws InterruptedException {
|
||||||
Thread.sleep(1_000);
|
numberOfConsumedMessages.incrementAndGet();
|
||||||
|
Thread.sleep(500);
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -2,11 +2,13 @@ package com.baeldung.transferqueue;
|
|||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.TransferQueue;
|
import java.util.concurrent.TransferQueue;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
public class Producer implements Runnable {
|
public class Producer implements Runnable {
|
||||||
private final TransferQueue<String> transferQueue;
|
private final TransferQueue<String> transferQueue;
|
||||||
private final String name;
|
private final String name;
|
||||||
private final Integer numberOfMessagesToProduce;
|
private final Integer numberOfMessagesToProduce;
|
||||||
|
public final AtomicInteger numberOfProducedMessages = new AtomicInteger();
|
||||||
|
|
||||||
public Producer(TransferQueue<String> transferQueue, String name, Integer numberOfMessagesToProduce) {
|
public Producer(TransferQueue<String> transferQueue, String name, Integer numberOfMessagesToProduce) {
|
||||||
this.transferQueue = transferQueue;
|
this.transferQueue = transferQueue;
|
||||||
@ -23,6 +25,7 @@ public class Producer implements Runnable {
|
|||||||
if (!added) {
|
if (!added) {
|
||||||
System.out.println("can not add an element due to the timeout");
|
System.out.println("can not add an element due to the timeout");
|
||||||
} else {
|
} else {
|
||||||
|
numberOfProducedMessages.incrementAndGet();
|
||||||
System.out.println("Producer: " + name + " transferred element: A" + i);
|
System.out.println("Producer: " + name + " transferred element: A" + i);
|
||||||
}
|
}
|
||||||
} catch (InterruptedException e) {
|
} catch (InterruptedException e) {
|
||||||
|
@ -1,9 +1,14 @@
|
|||||||
package com.baeldung.transferqueue;
|
package com.baeldung.transferqueue;
|
||||||
|
|
||||||
|
import org.junit.FixMethodOrder;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.runners.MethodSorters;
|
||||||
|
|
||||||
import java.util.concurrent.*;
|
import java.util.concurrent.*;
|
||||||
|
|
||||||
|
import static junit.framework.TestCase.assertEquals;
|
||||||
|
|
||||||
|
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
||||||
public class TransferQueueTest {
|
public class TransferQueueTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -25,6 +30,9 @@ public class TransferQueueTest {
|
|||||||
//then
|
//then
|
||||||
exService.awaitTermination(10_000, TimeUnit.MILLISECONDS);
|
exService.awaitTermination(10_000, TimeUnit.MILLISECONDS);
|
||||||
exService.shutdown();
|
exService.shutdown();
|
||||||
|
|
||||||
|
assertEquals(producer1.numberOfProducedMessages.intValue(), 3);
|
||||||
|
assertEquals(producer2.numberOfProducedMessages.intValue(), 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -42,6 +50,9 @@ public class TransferQueueTest {
|
|||||||
//then
|
//then
|
||||||
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
|
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
|
||||||
exService.shutdown();
|
exService.shutdown();
|
||||||
|
|
||||||
|
assertEquals(producer.numberOfProducedMessages.intValue(), 3);
|
||||||
|
assertEquals(consumer.numberOfConsumedMessages.intValue(), 3);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -57,6 +68,7 @@ public class TransferQueueTest {
|
|||||||
//then
|
//then
|
||||||
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
|
exService.awaitTermination(5000, TimeUnit.MILLISECONDS);
|
||||||
exService.shutdown();
|
exService.shutdown();
|
||||||
}
|
|
||||||
|
|
||||||
|
assertEquals(producer.numberOfProducedMessages.intValue(), 0);
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user