From bd1e69f5bf972261336a251fc4ef974536e3d678 Mon Sep 17 00:00:00 2001 From: Ali Dehghani Date: Sat, 16 Nov 2019 00:06:11 +0330 Subject: [PATCH 1/3] Sample codes for RejectedExecutionHandler article. --- .../rejection/SaturationPolicyUnitTest.java | 137 ++++++++++++++++++ 1 file changed, 137 insertions(+) create mode 100644 core-java-modules/core-java-concurrency-advanced-2/src/test/java/com/baeldung/rejection/SaturationPolicyUnitTest.java diff --git a/core-java-modules/core-java-concurrency-advanced-2/src/test/java/com/baeldung/rejection/SaturationPolicyUnitTest.java b/core-java-modules/core-java-concurrency-advanced-2/src/test/java/com/baeldung/rejection/SaturationPolicyUnitTest.java new file mode 100644 index 0000000000..fa0f299c4c --- /dev/null +++ b/core-java-modules/core-java-concurrency-advanced-2/src/test/java/com/baeldung/rejection/SaturationPolicyUnitTest.java @@ -0,0 +1,137 @@ +package com.baeldung.rejection; + +import org.junit.After; +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.*; +import java.util.concurrent.ThreadPoolExecutor.AbortPolicy; +import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy; +import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy; +import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static java.util.concurrent.TimeUnit.MILLISECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +public class SaturationPolicyUnitTest { + + private ThreadPoolExecutor executor; + + @After + public void shutdownExecutor() { + if (executor != null && !executor.isTerminated()) { + executor.shutdownNow(); + } + } + + @Test + public void givenAbortPolicy_WhenSaturated_ThenShouldThrowRejectedExecutionException() { + executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new SynchronousQueue<>(), new AbortPolicy()); + executor.execute(() -> waitFor(100)); + + assertThatThrownBy(() -> executor.execute(() -> System.out.println("Will be rejected"))) + .isInstanceOf(RejectedExecutionException.class); + } + + @Test + public void givenCallerRunsPolicy_WhenSaturated_ThenTheCallerThreadRunsTheTask() { + executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new SynchronousQueue<>(), new CallerRunsPolicy()); + executor.execute(() -> waitFor(100)); + + long startTime = System.nanoTime(); + executor.execute(() -> waitFor(100)); + double blockedDuration = (System.nanoTime() - startTime) / 1_000_000.0; + + assertThat(blockedDuration).isGreaterThanOrEqualTo(100); + } + + @Test + public void givenDiscardPolicy_WhenSaturated_ThenExecutorDiscardsTheNewTask() throws InterruptedException { + executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new SynchronousQueue<>(), new DiscardPolicy()); + executor.execute(() -> waitFor(100)); + + BlockingQueue queue = new LinkedBlockingDeque<>(); + executor.execute(() -> queue.offer("Result")); + + assertThat(queue.poll(200, MILLISECONDS)).isNull(); + } + + @Test + public void givenDiscardOldestPolicy_WhenSaturated_ThenExecutorDiscardsTheOldestTask() { + executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new ArrayBlockingQueue<>(2), new DiscardOldestPolicy()); + executor.execute(() -> waitFor(100)); + + BlockingQueue queue = new LinkedBlockingDeque<>(); + executor.execute(() -> queue.offer("First")); + executor.execute(() -> queue.offer("Second")); + executor.execute(() -> queue.offer("Third")); + + waitFor(150); + List results = new ArrayList<>(); + queue.drainTo(results); + assertThat(results).containsExactlyInAnyOrder("Second", "Third"); + } + + @Test + public void givenGrowPolicy_WhenSaturated_ThenExecutorIncreaseTheMaxPoolSize() { + executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new ArrayBlockingQueue<>(2), new GrowPolicy()); + executor.execute(() -> waitFor(100)); + + BlockingQueue queue = new LinkedBlockingDeque<>(); + executor.execute(() -> queue.offer("First")); + executor.execute(() -> queue.offer("Second")); + executor.execute(() -> queue.offer("Third")); + + waitFor(150); + List results = new ArrayList<>(); + queue.drainTo(results); + assertThat(results).containsExactlyInAnyOrder("First", "Second", "Third"); + } + + @Test + public void givenExecutorIsTerminated_WhenSubmittedNewTask_ThenTheSaturationPolicyApplies() { + ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>()); + executor.shutdownNow(); + + assertThatThrownBy(() -> executor.execute(() -> {})) + .isInstanceOf(RejectedExecutionException.class); + } + + @Test + public void givenExecutorIsTerminating_WhenSubmittedNewTask_ThenTheSaturationPolicyApplies() { + ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>()); + executor.execute(() -> waitFor(100)); + executor.shutdown(); + + assertThatThrownBy(() -> executor.execute(() -> {})) + .isInstanceOf(RejectedExecutionException.class); + } + + private static class GrowPolicy implements RejectedExecutionHandler { + + private final Lock lock = new ReentrantLock(); + + @Override + public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { + lock.lock(); + try { + executor.setMaximumPoolSize(executor.getMaximumPoolSize() + 1); + } finally { + lock.unlock(); + } + + executor.submit(r); + } + } + + private void waitFor(int millis) { + try { + Thread.sleep(millis); + } catch (InterruptedException ignored) { + } + } +} From c0fdbd4a80ebabb4cc6425cef5fb936034c43a0c Mon Sep 17 00:00:00 2001 From: Ali Dehghani Date: Wed, 20 Nov 2019 08:05:39 +0330 Subject: [PATCH 2/3] Sample codes for RejectedExecutionHandler article. --- .../baeldung/rejection/SaturationPolicyUnitTest.java | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/core-java-modules/core-java-concurrency-advanced-2/src/test/java/com/baeldung/rejection/SaturationPolicyUnitTest.java b/core-java-modules/core-java-concurrency-advanced-2/src/test/java/com/baeldung/rejection/SaturationPolicyUnitTest.java index fa0f299c4c..83f1d49a1b 100644 --- a/core-java-modules/core-java-concurrency-advanced-2/src/test/java/com/baeldung/rejection/SaturationPolicyUnitTest.java +++ b/core-java-modules/core-java-concurrency-advanced-2/src/test/java/com/baeldung/rejection/SaturationPolicyUnitTest.java @@ -33,8 +33,7 @@ public class SaturationPolicyUnitTest { executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new SynchronousQueue<>(), new AbortPolicy()); executor.execute(() -> waitFor(100)); - assertThatThrownBy(() -> executor.execute(() -> System.out.println("Will be rejected"))) - .isInstanceOf(RejectedExecutionException.class); + assertThatThrownBy(() -> executor.execute(() -> System.out.println("Will be rejected"))).isInstanceOf(RejectedExecutionException.class); } @Test @@ -97,8 +96,7 @@ public class SaturationPolicyUnitTest { ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>()); executor.shutdownNow(); - assertThatThrownBy(() -> executor.execute(() -> {})) - .isInstanceOf(RejectedExecutionException.class); + assertThatThrownBy(() -> executor.execute(() -> {})).isInstanceOf(RejectedExecutionException.class); } @Test @@ -107,8 +105,8 @@ public class SaturationPolicyUnitTest { executor.execute(() -> waitFor(100)); executor.shutdown(); - assertThatThrownBy(() -> executor.execute(() -> {})) - .isInstanceOf(RejectedExecutionException.class); + assertThatThrownBy(() -> executor.execute(() -> { + })).isInstanceOf(RejectedExecutionException.class); } private static class GrowPolicy implements RejectedExecutionHandler { From b8a45c8ed206ff65937f5afc5a5f13ede1e48fc1 Mon Sep 17 00:00:00 2001 From: Ali Dehghani Date: Mon, 25 Nov 2019 22:09:20 +0330 Subject: [PATCH 3/3] Using a new module. --- .../core-java-concurrency-advanced-3/pom.xml | 15 ++++++++++++--- .../rejection/SaturationPolicyUnitTest.java | 3 ++- 2 files changed, 14 insertions(+), 4 deletions(-) rename core-java-modules/{core-java-concurrency-advanced-2 => core-java-concurrency-advanced-3}/src/test/java/com/baeldung/rejection/SaturationPolicyUnitTest.java (97%) diff --git a/core-java-modules/core-java-concurrency-advanced-3/pom.xml b/core-java-modules/core-java-concurrency-advanced-3/pom.xml index cc7b7b1e70..2860e3e698 100644 --- a/core-java-modules/core-java-concurrency-advanced-3/pom.xml +++ b/core-java-modules/core-java-concurrency-advanced-3/pom.xml @@ -1,7 +1,9 @@ - + + 4.0.0 - com.baeldung core-java-concurrency-advanced-3 0.1.0-SNAPSHOT core-java-concurrency-advanced-3 @@ -15,6 +17,12 @@ + + org.assertj + assertj-core + ${assertj.version} + test + @@ -28,6 +36,7 @@ + 3.14.0 diff --git a/core-java-modules/core-java-concurrency-advanced-2/src/test/java/com/baeldung/rejection/SaturationPolicyUnitTest.java b/core-java-modules/core-java-concurrency-advanced-3/src/test/java/com/baeldung/rejection/SaturationPolicyUnitTest.java similarity index 97% rename from core-java-modules/core-java-concurrency-advanced-2/src/test/java/com/baeldung/rejection/SaturationPolicyUnitTest.java rename to core-java-modules/core-java-concurrency-advanced-3/src/test/java/com/baeldung/rejection/SaturationPolicyUnitTest.java index 83f1d49a1b..5016cc1d06 100644 --- a/core-java-modules/core-java-concurrency-advanced-2/src/test/java/com/baeldung/rejection/SaturationPolicyUnitTest.java +++ b/core-java-modules/core-java-concurrency-advanced-3/src/test/java/com/baeldung/rejection/SaturationPolicyUnitTest.java @@ -96,7 +96,8 @@ public class SaturationPolicyUnitTest { ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>()); executor.shutdownNow(); - assertThatThrownBy(() -> executor.execute(() -> {})).isInstanceOf(RejectedExecutionException.class); + assertThatThrownBy(() -> executor.execute(() -> { + })).isInstanceOf(RejectedExecutionException.class); } @Test