Bael 882 delay queue (#1792)
* BAEL-882 Delay_queue article code * BAEL-882 Removed println * BAEL-882 formatting * BAEL-882 test ordered
This commit is contained in:
parent
075eeb1539
commit
ace7ccd55b
@ -0,0 +1,39 @@
|
|||||||
|
package com.baeldung.concurrent.delayqueue;
|
||||||
|
|
||||||
|
import java.util.concurrent.Delayed;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
|
public class DelayObject implements Delayed {
|
||||||
|
private String data;
|
||||||
|
private long startTime;
|
||||||
|
|
||||||
|
public DelayObject(String data, long delayInMilliseconds) {
|
||||||
|
this.data = data;
|
||||||
|
this.startTime = System.currentTimeMillis() + delayInMilliseconds;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public long getDelay(TimeUnit unit) {
|
||||||
|
long diff = startTime - System.currentTimeMillis();
|
||||||
|
return unit.convert(diff, TimeUnit.MILLISECONDS);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public int compareTo(Delayed o) {
|
||||||
|
if (this.startTime < ((DelayObject) o).startTime) {
|
||||||
|
return -1;
|
||||||
|
}
|
||||||
|
if (this.startTime > ((DelayObject) o).startTime) {
|
||||||
|
return 1;
|
||||||
|
}
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
return "{" +
|
||||||
|
"data='" + data + '\'' +
|
||||||
|
", startTime=" + startTime +
|
||||||
|
'}';
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,30 @@
|
|||||||
|
package com.baeldung.concurrent.delayqueue;
|
||||||
|
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
|
||||||
|
public class DelayQueueConsumer implements Runnable {
|
||||||
|
private BlockingQueue<DelayObject> queue;
|
||||||
|
private final Integer numberOfElementsToTake;
|
||||||
|
public final AtomicInteger numberOfConsumedElements = new AtomicInteger();
|
||||||
|
|
||||||
|
public DelayQueueConsumer(BlockingQueue<DelayObject> queue, Integer numberOfElementsToTake) {
|
||||||
|
this.queue = queue;
|
||||||
|
this.numberOfElementsToTake = numberOfElementsToTake;
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
for (int i = 0; i < numberOfElementsToTake; i++) {
|
||||||
|
try {
|
||||||
|
DelayObject object = queue.take();
|
||||||
|
numberOfConsumedElements.incrementAndGet();
|
||||||
|
System.out.println("Consumer take: " + object);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,34 @@
|
|||||||
|
package com.baeldung.concurrent.delayqueue;
|
||||||
|
|
||||||
|
import java.util.UUID;
|
||||||
|
import java.util.concurrent.BlockingQueue;
|
||||||
|
|
||||||
|
|
||||||
|
public class DelayQueueProducer implements Runnable {
|
||||||
|
private BlockingQueue<DelayObject> queue;
|
||||||
|
private final Integer numberOfElementsToProduce;
|
||||||
|
private final Integer delayOfEachProducedMessageMilliseconds;
|
||||||
|
|
||||||
|
public DelayQueueProducer(BlockingQueue<DelayObject> queue,
|
||||||
|
Integer numberOfElementsToProduce,
|
||||||
|
Integer delayOfEachProducedMessageMilliseconds) {
|
||||||
|
this.queue = queue;
|
||||||
|
this.numberOfElementsToProduce = numberOfElementsToProduce;
|
||||||
|
this.delayOfEachProducedMessageMilliseconds = delayOfEachProducedMessageMilliseconds;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
for (int i = 0; i < numberOfElementsToProduce; i++) {
|
||||||
|
DelayObject object
|
||||||
|
= new DelayObject(UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds);
|
||||||
|
System.out.println("Put object = " + object);
|
||||||
|
try {
|
||||||
|
queue.put(object);
|
||||||
|
Thread.sleep(500);
|
||||||
|
} catch (InterruptedException ie) {
|
||||||
|
ie.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,78 @@
|
|||||||
|
package com.baeldung.concurrent.delayqueue;
|
||||||
|
|
||||||
|
import org.junit.FixMethodOrder;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.runners.MethodSorters;
|
||||||
|
|
||||||
|
import java.util.concurrent.*;
|
||||||
|
|
||||||
|
import static junit.framework.TestCase.assertEquals;
|
||||||
|
|
||||||
|
@FixMethodOrder(MethodSorters.NAME_ASCENDING)
|
||||||
|
public class DelayQueueTest {
|
||||||
|
@Test
|
||||||
|
public void givenDelayQueue_whenProduceElement_thenShouldConsumeAfterGivenDelay() throws InterruptedException {
|
||||||
|
//given
|
||||||
|
ExecutorService executor = Executors.newFixedThreadPool(2);
|
||||||
|
BlockingQueue<DelayObject> queue = new DelayQueue<>();
|
||||||
|
int numberOfElementsToProduce = 2;
|
||||||
|
int delayOfEachProducedMessageMilliseconds = 500;
|
||||||
|
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
|
||||||
|
DelayQueueProducer producer
|
||||||
|
= new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
|
||||||
|
|
||||||
|
//when
|
||||||
|
executor.submit(producer);
|
||||||
|
executor.submit(consumer);
|
||||||
|
|
||||||
|
//then
|
||||||
|
executor.awaitTermination(5, TimeUnit.SECONDS);
|
||||||
|
executor.shutdown();
|
||||||
|
assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenDelayQueue_whenProduceElementWithHugeDelay_thenConsumerWasNotAbleToConsumeMessageInGivenTime() throws InterruptedException {
|
||||||
|
//given
|
||||||
|
ExecutorService executor = Executors.newFixedThreadPool(2);
|
||||||
|
BlockingQueue<DelayObject> queue = new DelayQueue<>();
|
||||||
|
int numberOfElementsToProduce = 1;
|
||||||
|
int delayOfEachProducedMessageMilliseconds = 10_000;
|
||||||
|
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
|
||||||
|
DelayQueueProducer producer
|
||||||
|
= new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
|
||||||
|
|
||||||
|
//when
|
||||||
|
executor.submit(producer);
|
||||||
|
executor.submit(consumer);
|
||||||
|
|
||||||
|
//then
|
||||||
|
executor.awaitTermination(5, TimeUnit.SECONDS);
|
||||||
|
executor.shutdown();
|
||||||
|
assertEquals(consumer.numberOfConsumedElements.get(), 0);
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenDelayQueue_whenProduceElementWithNegativeDelay_thenConsumeMessageImmediately() throws InterruptedException {
|
||||||
|
//given
|
||||||
|
ExecutorService executor = Executors.newFixedThreadPool(2);
|
||||||
|
BlockingQueue<DelayObject> queue = new DelayQueue<>();
|
||||||
|
int numberOfElementsToProduce = 1;
|
||||||
|
int delayOfEachProducedMessageMilliseconds = -10_000;
|
||||||
|
DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce);
|
||||||
|
DelayQueueProducer producer
|
||||||
|
= new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds);
|
||||||
|
|
||||||
|
//when
|
||||||
|
executor.submit(producer);
|
||||||
|
executor.submit(consumer);
|
||||||
|
|
||||||
|
//then
|
||||||
|
executor.awaitTermination(1, TimeUnit.SECONDS);
|
||||||
|
executor.shutdown();
|
||||||
|
assertEquals(consumer.numberOfConsumedElements.get(), 1);
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user