diff --git a/core-java/src/main/java/com/baeldung/concurrent/delayqueue/DelayObject.java b/core-java/src/main/java/com/baeldung/concurrent/delayqueue/DelayObject.java new file mode 100644 index 0000000000..6db0a66b1e --- /dev/null +++ b/core-java/src/main/java/com/baeldung/concurrent/delayqueue/DelayObject.java @@ -0,0 +1,39 @@ +package com.baeldung.concurrent.delayqueue; + +import java.util.concurrent.Delayed; +import java.util.concurrent.TimeUnit; + +public class DelayObject implements Delayed { + private String data; + private long startTime; + + public DelayObject(String data, long delayInMilliseconds) { + this.data = data; + this.startTime = System.currentTimeMillis() + delayInMilliseconds; + } + + @Override + public long getDelay(TimeUnit unit) { + long diff = startTime - System.currentTimeMillis(); + return unit.convert(diff, TimeUnit.MILLISECONDS); + } + + @Override + public int compareTo(Delayed o) { + if (this.startTime < ((DelayObject) o).startTime) { + return -1; + } + if (this.startTime > ((DelayObject) o).startTime) { + return 1; + } + return 0; + } + + @Override + public String toString() { + return "{" + + "data='" + data + '\'' + + ", startTime=" + startTime + + '}'; + } +} \ No newline at end of file diff --git a/core-java/src/main/java/com/baeldung/concurrent/delayqueue/DelayQueueConsumer.java b/core-java/src/main/java/com/baeldung/concurrent/delayqueue/DelayQueueConsumer.java new file mode 100644 index 0000000000..8a969bf7aa --- /dev/null +++ b/core-java/src/main/java/com/baeldung/concurrent/delayqueue/DelayQueueConsumer.java @@ -0,0 +1,30 @@ +package com.baeldung.concurrent.delayqueue; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.atomic.AtomicInteger; + + +public class DelayQueueConsumer implements Runnable { + private BlockingQueue queue; + private final Integer numberOfElementsToTake; + public final AtomicInteger numberOfConsumedElements = new AtomicInteger(); + + public DelayQueueConsumer(BlockingQueue queue, Integer numberOfElementsToTake) { + this.queue = queue; + this.numberOfElementsToTake = numberOfElementsToTake; + } + + + @Override + public void run() { + for (int i = 0; i < numberOfElementsToTake; i++) { + try { + DelayObject object = queue.take(); + numberOfConsumedElements.incrementAndGet(); + System.out.println("Consumer take: " + object); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +} \ No newline at end of file diff --git a/core-java/src/main/java/com/baeldung/concurrent/delayqueue/DelayQueueProducer.java b/core-java/src/main/java/com/baeldung/concurrent/delayqueue/DelayQueueProducer.java new file mode 100644 index 0000000000..617f19b9ac --- /dev/null +++ b/core-java/src/main/java/com/baeldung/concurrent/delayqueue/DelayQueueProducer.java @@ -0,0 +1,34 @@ +package com.baeldung.concurrent.delayqueue; + +import java.util.UUID; +import java.util.concurrent.BlockingQueue; + + +public class DelayQueueProducer implements Runnable { + private BlockingQueue queue; + private final Integer numberOfElementsToProduce; + private final Integer delayOfEachProducedMessageMilliseconds; + + public DelayQueueProducer(BlockingQueue queue, + Integer numberOfElementsToProduce, + Integer delayOfEachProducedMessageMilliseconds) { + this.queue = queue; + this.numberOfElementsToProduce = numberOfElementsToProduce; + this.delayOfEachProducedMessageMilliseconds = delayOfEachProducedMessageMilliseconds; + } + + @Override + public void run() { + for (int i = 0; i < numberOfElementsToProduce; i++) { + DelayObject object + = new DelayObject(UUID.randomUUID().toString(), delayOfEachProducedMessageMilliseconds); + System.out.println("Put object = " + object); + try { + queue.put(object); + Thread.sleep(500); + } catch (InterruptedException ie) { + ie.printStackTrace(); + } + } + } +} \ No newline at end of file diff --git a/core-java/src/test/java/com/baeldung/concurrent/delayqueue/DelayQueueTest.java b/core-java/src/test/java/com/baeldung/concurrent/delayqueue/DelayQueueTest.java new file mode 100644 index 0000000000..e18c087b0a --- /dev/null +++ b/core-java/src/test/java/com/baeldung/concurrent/delayqueue/DelayQueueTest.java @@ -0,0 +1,78 @@ +package com.baeldung.concurrent.delayqueue; + +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runners.MethodSorters; + +import java.util.concurrent.*; + +import static junit.framework.TestCase.assertEquals; + +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class DelayQueueTest { + @Test + public void givenDelayQueue_whenProduceElement_thenShouldConsumeAfterGivenDelay() throws InterruptedException { + //given + ExecutorService executor = Executors.newFixedThreadPool(2); + BlockingQueue queue = new DelayQueue<>(); + int numberOfElementsToProduce = 2; + int delayOfEachProducedMessageMilliseconds = 500; + DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce); + DelayQueueProducer producer + = new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds); + + //when + executor.submit(producer); + executor.submit(consumer); + + //then + executor.awaitTermination(5, TimeUnit.SECONDS); + executor.shutdown(); + assertEquals(consumer.numberOfConsumedElements.get(), numberOfElementsToProduce); + + } + + @Test + public void givenDelayQueue_whenProduceElementWithHugeDelay_thenConsumerWasNotAbleToConsumeMessageInGivenTime() throws InterruptedException { + //given + ExecutorService executor = Executors.newFixedThreadPool(2); + BlockingQueue queue = new DelayQueue<>(); + int numberOfElementsToProduce = 1; + int delayOfEachProducedMessageMilliseconds = 10_000; + DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce); + DelayQueueProducer producer + = new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds); + + //when + executor.submit(producer); + executor.submit(consumer); + + //then + executor.awaitTermination(5, TimeUnit.SECONDS); + executor.shutdown(); + assertEquals(consumer.numberOfConsumedElements.get(), 0); + + } + + @Test + public void givenDelayQueue_whenProduceElementWithNegativeDelay_thenConsumeMessageImmediately() throws InterruptedException { + //given + ExecutorService executor = Executors.newFixedThreadPool(2); + BlockingQueue queue = new DelayQueue<>(); + int numberOfElementsToProduce = 1; + int delayOfEachProducedMessageMilliseconds = -10_000; + DelayQueueConsumer consumer = new DelayQueueConsumer(queue, numberOfElementsToProduce); + DelayQueueProducer producer + = new DelayQueueProducer(queue, numberOfElementsToProduce, delayOfEachProducedMessageMilliseconds); + + //when + executor.submit(producer); + executor.submit(consumer); + + //then + executor.awaitTermination(1, TimeUnit.SECONDS); + executor.shutdown(); + assertEquals(consumer.numberOfConsumedElements.get(), 1); + + } +} \ No newline at end of file