Merge pull request #8196 from alimate/BAEL-3521

BAEL-3521: RejectedExecutionHandler
This commit is contained in:
Eric Martin 2019-11-25 19:12:53 -06:00 committed by GitHub
commit ec28cd0dc2
2 changed files with 151 additions and 3 deletions

View File

@ -1,7 +1,9 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.baeldung</groupId>
<artifactId>core-java-concurrency-advanced-3</artifactId>
<version>0.1.0-SNAPSHOT</version>
<name>core-java-concurrency-advanced-3</name>
@ -14,6 +16,15 @@
<relativePath>../../parent-java</relativePath>
</parent>
<dependencies>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<finalName>core-java-concurrency-advanced-3</finalName>
<plugins>
@ -35,6 +46,7 @@
</build>
<properties>
<assertj.version>3.14.0</assertj.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

View File

@ -0,0 +1,136 @@
package com.baeldung.rejection;
import org.junit.After;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.*;
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy;
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy;
import java.util.concurrent.ThreadPoolExecutor.DiscardOldestPolicy;
import java.util.concurrent.ThreadPoolExecutor.DiscardPolicy;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
public class SaturationPolicyUnitTest {
private ThreadPoolExecutor executor;
@After
public void shutdownExecutor() {
if (executor != null && !executor.isTerminated()) {
executor.shutdownNow();
}
}
@Test
public void givenAbortPolicy_WhenSaturated_ThenShouldThrowRejectedExecutionException() {
executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new SynchronousQueue<>(), new AbortPolicy());
executor.execute(() -> waitFor(100));
assertThatThrownBy(() -> executor.execute(() -> System.out.println("Will be rejected"))).isInstanceOf(RejectedExecutionException.class);
}
@Test
public void givenCallerRunsPolicy_WhenSaturated_ThenTheCallerThreadRunsTheTask() {
executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new SynchronousQueue<>(), new CallerRunsPolicy());
executor.execute(() -> waitFor(100));
long startTime = System.nanoTime();
executor.execute(() -> waitFor(100));
double blockedDuration = (System.nanoTime() - startTime) / 1_000_000.0;
assertThat(blockedDuration).isGreaterThanOrEqualTo(100);
}
@Test
public void givenDiscardPolicy_WhenSaturated_ThenExecutorDiscardsTheNewTask() throws InterruptedException {
executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new SynchronousQueue<>(), new DiscardPolicy());
executor.execute(() -> waitFor(100));
BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("Result"));
assertThat(queue.poll(200, MILLISECONDS)).isNull();
}
@Test
public void givenDiscardOldestPolicy_WhenSaturated_ThenExecutorDiscardsTheOldestTask() {
executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new ArrayBlockingQueue<>(2), new DiscardOldestPolicy());
executor.execute(() -> waitFor(100));
BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
waitFor(150);
List<String> results = new ArrayList<>();
queue.drainTo(results);
assertThat(results).containsExactlyInAnyOrder("Second", "Third");
}
@Test
public void givenGrowPolicy_WhenSaturated_ThenExecutorIncreaseTheMaxPoolSize() {
executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new ArrayBlockingQueue<>(2), new GrowPolicy());
executor.execute(() -> waitFor(100));
BlockingQueue<String> queue = new LinkedBlockingDeque<>();
executor.execute(() -> queue.offer("First"));
executor.execute(() -> queue.offer("Second"));
executor.execute(() -> queue.offer("Third"));
waitFor(150);
List<String> results = new ArrayList<>();
queue.drainTo(results);
assertThat(results).containsExactlyInAnyOrder("First", "Second", "Third");
}
@Test
public void givenExecutorIsTerminated_WhenSubmittedNewTask_ThenTheSaturationPolicyApplies() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.shutdownNow();
assertThatThrownBy(() -> executor.execute(() -> {
})).isInstanceOf(RejectedExecutionException.class);
}
@Test
public void givenExecutorIsTerminating_WhenSubmittedNewTask_ThenTheSaturationPolicyApplies() {
ThreadPoolExecutor executor = new ThreadPoolExecutor(1, 1, 0, MILLISECONDS, new LinkedBlockingQueue<>());
executor.execute(() -> waitFor(100));
executor.shutdown();
assertThatThrownBy(() -> executor.execute(() -> {
})).isInstanceOf(RejectedExecutionException.class);
}
private static class GrowPolicy implements RejectedExecutionHandler {
private final Lock lock = new ReentrantLock();
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
lock.lock();
try {
executor.setMaximumPoolSize(executor.getMaximumPoolSize() + 1);
} finally {
lock.unlock();
}
executor.submit(r);
}
}
private void waitFor(int millis) {
try {
Thread.sleep(millis);
} catch (InterruptedException ignored) {
}
}
}