Work stealing framework

This commit is contained in:
Thabo Ntsoko 2019-12-27 16:47:45 +02:00
parent b63a8c2335
commit f87723e7ad
3 changed files with 196 additions and 0 deletions

View File

@ -0,0 +1,84 @@
package com.baeldung.workstealing;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ForkJoinTask;
import java.util.concurrent.RecursiveAction;
import java.util.concurrent.atomic.AtomicInteger;
public class PrimeNumbers extends RecursiveAction {
private int lowerBound;
private int upperBound;
private int granularity;
static final List<Integer> GRANULARITIES
= Arrays.asList(1, 10, 100, 1000, 10000);
private AtomicInteger noOfPrimeNumbers = new AtomicInteger();
PrimeNumbers(int lowerBound, int upperBound, int granularity) {
this.lowerBound = lowerBound;
this.upperBound = upperBound;
this.granularity = granularity;
}
PrimeNumbers(int upperBound) {
this(1, upperBound, 100);
}
private PrimeNumbers(int lowerBound, int upperBound) {
this(lowerBound, upperBound, 100);
}
private List<PrimeNumbers> subTasks() {
List<PrimeNumbers> subTasks = new ArrayList<>();
for (int i = 1; i <= this.upperBound / granularity; i++) {
int upper = i * granularity;
int lower = (upper - granularity) + 1;
subTasks.add(new PrimeNumbers(lower, upper));
}
return subTasks;
}
@Override
protected void compute() {
if (((upperBound + 1) - lowerBound) > granularity) {
ForkJoinTask.invokeAll(subTasks());
} else {
findPrimeNumbers();
}
}
void findPrimeNumbers() {
for (int num = lowerBound; num <= upperBound; num++) {
if (isPrime(num)) {
noOfPrimeNumbers.getAndIncrement();
}
}
}
private boolean isPrime(int number) {
if (number == 2) {
return true;
}
if (number == 1 || number % 2 == 0) {
return false;
}
int noOfNaturalNumbers = 0;
for (int i = 1; i <= number; i++) {
if (number % i == 0) {
noOfNaturalNumbers++;
}
}
return noOfNaturalNumbers == 2;
}
public int noOfPrimeNumbers() {
return noOfPrimeNumbers.intValue();
}
}

View File

@ -0,0 +1,101 @@
package com.baeldung.workstealing;
import org.junit.Test;
import org.openjdk.jmh.annotations.*;
import org.openjdk.jmh.runner.Runner;
import org.openjdk.jmh.runner.RunnerException;
import org.openjdk.jmh.runner.options.Options;
import org.openjdk.jmh.runner.options.OptionsBuilder;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.logging.Logger;
import static org.junit.Assert.fail;
public class PrimeNumbersUnitTest {
private static Logger logger = Logger.getAnonymousLogger();
@Test
public void givenPrimesCalculated_whenUsingPoolsAndOneThread_thenOneThreadSlowest() {
Options opt = new OptionsBuilder()
.include(Benchmarker.class.getSimpleName())
.forks(1)
.build();
try {
new Runner(opt).run();
} catch (RunnerException e) {
fail();
}
}
@Test
public void givenNewWorkStealingPool_whenGettingPrimes_thenStealCountChanges() {
StringBuilder info = new StringBuilder();
for (int granularity : PrimeNumbers.GRANULARITIES) {
int parallelism = ForkJoinPool.getCommonPoolParallelism();
ForkJoinPool pool =
(ForkJoinPool) Executors.newWorkStealingPool(parallelism);
stealCountInfo(info, granularity, pool);
}
logger.info("\nExecutors.newWorkStealingPool ->" + info.toString());
}
@Test
public void givenCommonPool_whenGettingPrimes_thenStealCountChangesSlowly() {
StringBuilder info = new StringBuilder();
for (int granularity : PrimeNumbers.GRANULARITIES) {
ForkJoinPool pool = ForkJoinPool.commonPool();
stealCountInfo(info, granularity, pool);
}
logger.info("\nForkJoinPool.commonPool ->" + info.toString());
}
private void stealCountInfo(StringBuilder info, int granularity, ForkJoinPool forkJoinPool) {
PrimeNumbers primes = new PrimeNumbers(1, 10000, granularity);
forkJoinPool.invoke(primes);
forkJoinPool.shutdown();
long steals = forkJoinPool.getStealCount();
String output = "\nGranularity: [" + granularity + "], Steals: [" + steals + "]";
info.append(output);
}
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.MILLISECONDS)
@State(Scope.Benchmark)
@Fork(value = 2, warmups = 1, jvmArgs = {"-Xms2G", "-Xmx2G"})
public static class Benchmarker {
@Benchmark
public void singleThread() {
PrimeNumbers primes = new PrimeNumbers(10000);
primes.findPrimeNumbers(); // get prime numbers using a single thread
}
@Benchmark
public void commonPoolBenchmark() {
PrimeNumbers primes = new PrimeNumbers(10000);
ForkJoinPool pool = ForkJoinPool.commonPool();
pool.invoke(primes);
pool.shutdown();
}
@Benchmark
public void newWorkStealingPoolBenchmark() {
PrimeNumbers primes = new PrimeNumbers(10000);
int parallelism = ForkJoinPool.getCommonPoolParallelism();
ForkJoinPool stealer = (ForkJoinPool) Executors.newWorkStealingPool(parallelism);
stealer.invoke(primes);
stealer.shutdown();
}
}
}

View File

@ -27,11 +27,22 @@
<artifactId>commons-io</artifactId>
<version>${commons.io.version}</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>${jmh.version}</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
</dependency>
</dependencies>
<properties>
<guava.version>23.0</guava.version>
<commons.io.version>2.6</commons.io.version>
<jmh.version>1.19</jmh.version>
</properties>
</project>