Merge pull request #10727 from dstr89/feature/BAEL-4876-using-parallel-streams-2

BAEL-4876: Parallel stream examples added
This commit is contained in:
davidmartinezbarua 2021-05-16 10:17:16 -03:00 committed by GitHub
commit 9258d74afa
9 changed files with 300 additions and 0 deletions

View File

@ -27,6 +27,17 @@
<version>${lombok.version}</version> <version>${lombok.version}</version>
<scope>provided</scope> <scope>provided</scope>
</dependency> </dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>${jmh.version}</version>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
<scope>test</scope>
</dependency>
<!-- test scoped --> <!-- test scoped -->
<dependency> <dependency>
<groupId>org.assertj</groupId> <groupId>org.assertj</groupId>
@ -44,11 +55,30 @@
<filtering>true</filtering> <filtering>true</filtering>
</resource> </resource>
</resources> </resources>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
<annotationProcessorPaths>
<path>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
</path>
</annotationProcessorPaths>
</configuration>
</plugin>
</plugins>
</build> </build>
<properties> <properties>
<lombok.version>1.18.20</lombok.version>
<!-- testing --> <!-- testing -->
<assertj.version>3.6.1</assertj.version> <assertj.version>3.6.1</assertj.version>
<jmh.version>1.29</jmh.version>
</properties> </properties>
</project> </project>

View File

@ -0,0 +1,9 @@
package com.baeldung.streams.parallel;
public class BenchmarkRunner {
public static void main(String[] args) throws Exception {
org.openjdk.jmh.Main.main(args);
}
}

View File

@ -0,0 +1,54 @@
package com.baeldung.streams.parallel;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class DifferentSourceSplitting {
private static final List<Integer> arrayListOfNumbers = new ArrayList<>();
private static final List<Integer> linkedListOfNumbers = new LinkedList<>();
static {
IntStream.rangeClosed(1, 1_000_000).forEach(i -> {
arrayListOfNumbers.add(i);
linkedListOfNumbers.add(i);
});
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public static void differentSourceArrayListSequential() {
arrayListOfNumbers.stream().reduce(0, Integer::sum);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public static void differentSourceArrayListParallel() {
arrayListOfNumbers.parallelStream().reduce(0, Integer::sum);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public static void differentSourceLinkedListSequential() {
linkedListOfNumbers.stream().reduce(0, Integer::sum);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public static void differentSourceLinkedListParallel() {
linkedListOfNumbers.parallelStream().reduce(0, Integer::sum);
}
}

View File

@ -0,0 +1,52 @@
package com.baeldung.streams.parallel;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import java.util.Arrays;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class MemoryLocalityCosts {
private static final int[] intArray = new int[1_000_000];
private static final Integer[] integerArray = new Integer[1_000_000];
static {
IntStream.rangeClosed(1, 1_000_000).forEach(i -> {
intArray[i-1] = i;
integerArray[i-1] = i;
});
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public static void localityIntArraySequential() {
Arrays.stream(intArray).reduce(0, Integer::sum);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public static void localityIntArrayParallel() {
Arrays.stream(intArray).parallel().reduce(0, Integer::sum);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public static void localityIntegerArraySequential() {
Arrays.stream(integerArray).reduce(0, Integer::sum);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public static void localityIntegerArrayParallel() {
Arrays.stream(integerArray).parallel().reduce(0, Integer::sum);
}
}

View File

@ -0,0 +1,52 @@
package com.baeldung.streams.parallel;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class MergingCosts {
private static final List<Integer> arrayListOfNumbers = new ArrayList<>();
static {
IntStream.rangeClosed(1, 1_000_000).forEach(i -> {
arrayListOfNumbers.add(i);
});
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public static void mergingCostsSumSequential() {
arrayListOfNumbers.stream().reduce(0, Integer::sum);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public static void mergingCostsSumParallel() {
arrayListOfNumbers.stream().parallel().reduce(0, Integer::sum);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public static void mergingCostsGroupingSequential() {
arrayListOfNumbers.stream().collect(Collectors.toSet());
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public static void mergingCostsGroupingParallel() {
arrayListOfNumbers.stream().parallel().collect(Collectors.toSet());
}
}

View File

@ -0,0 +1,15 @@
package com.baeldung.streams.parallel;
import java.util.Arrays;
import java.util.List;
public class ParallelStream {
public static void main(String[] args) {
List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
listOfNumbers.parallelStream().forEach(number ->
System.out.println(number + " " + Thread.currentThread().getName())
);
}
}

View File

@ -0,0 +1,15 @@
package com.baeldung.streams.parallel;
import java.util.Arrays;
import java.util.List;
public class SequentialStream {
public static void main(String[] args) {
List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
listOfNumbers.stream().forEach(number ->
System.out.println(number + " " + Thread.currentThread().getName())
);
}
}

View File

@ -0,0 +1,27 @@
package com.baeldung.streams.parallel;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import java.util.concurrent.TimeUnit;
import java.util.stream.IntStream;
public class SplittingCosts {
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public static void sourceSplittingIntStreamSequential() {
IntStream.rangeClosed(1, 100).reduce(0, Integer::sum);
}
@Benchmark
@BenchmarkMode(Mode.AverageTime)
@OutputTimeUnit(TimeUnit.NANOSECONDS)
public static void sourceSplittingIntStreamParallel() {
IntStream.rangeClosed(1, 100).parallel().reduce(0, Integer::sum);
}
}

View File

@ -0,0 +1,46 @@
package com.baeldung.streams.parallel;
import org.junit.jupiter.api.Test;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import static org.assertj.core.api.Assertions.assertThat;
class ForkJoinUnitTest {
@Test
void givenSequentialStreamOfNumbers_whenReducingSumWithIdentityFive_thenResultIsCorrect() {
List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
int sum = listOfNumbers.stream().reduce(5, Integer::sum);
assertThat(sum).isEqualTo(15);
}
@Test
void givenParallelStreamOfNumbers_whenReducingSumWithIdentityFive_thenResultIsNotCorrect() {
List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
int sum = listOfNumbers.parallelStream().reduce(5, Integer::sum);
assertThat(sum).isNotEqualTo(15);
}
@Test
void givenParallelStreamOfNumbers_whenReducingSumWithIdentityZero_thenResultIsCorrect() {
List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
int sum = listOfNumbers.parallelStream().reduce(0, Integer::sum) + 5;
assertThat(sum).isEqualTo(15);
}
@Test
public void givenParallelStreamOfNumbers_whenUsingCustomThreadPool_thenResultIsCorrect()
throws InterruptedException, ExecutionException {
List<Integer> listOfNumbers = Arrays.asList(1, 2, 3, 4);
ForkJoinPool customThreadPool = new ForkJoinPool(4);
int sum = customThreadPool.submit(
() -> listOfNumbers.parallelStream().reduce(0, Integer::sum)).get();
customThreadPool.shutdown();
assertThat(sum).isEqualTo(10);
}
}