Parallel Collectors with Virtual Threads (#16313)
This commit is contained in:
parent
6a81307724
commit
dde9779c83
|
@ -49,11 +49,23 @@
|
||||||
|
|
||||||
<properties>
|
<properties>
|
||||||
<jool.version>0.9.12</jool.version>
|
<jool.version>0.9.12</jool.version>
|
||||||
<parallel-collectors.version>2.6.0</parallel-collectors.version>
|
<parallel-collectors.version>3.0.0</parallel-collectors.version>
|
||||||
<vavr.version>0.9.0</vavr.version>
|
<vavr.version>0.9.0</vavr.version>
|
||||||
<eclipse-collections.version>8.2.0</eclipse-collections.version>
|
<eclipse-collections.version>8.2.0</eclipse-collections.version>
|
||||||
<streamex.version>0.8.1</streamex.version>
|
<streamex.version>0.8.1</streamex.version>
|
||||||
<protonpack.version>1.15</protonpack.version>
|
<protonpack.version>1.15</protonpack.version>
|
||||||
</properties>
|
</properties>
|
||||||
|
|
||||||
|
<build>
|
||||||
|
<plugins>
|
||||||
|
<plugin>
|
||||||
|
<groupId>org.apache.maven.plugins</groupId>
|
||||||
|
<artifactId>maven-compiler-plugin</artifactId>
|
||||||
|
<configuration>
|
||||||
|
<source>21</source>
|
||||||
|
<target>21</target>
|
||||||
|
</configuration>
|
||||||
|
</plugin>
|
||||||
|
</plugins>
|
||||||
|
</build>
|
||||||
</project>
|
</project>
|
||||||
|
|
|
@ -0,0 +1,73 @@
|
||||||
|
package com.baeldung.parallel_collectors;
|
||||||
|
|
||||||
|
import com.pivovarit.collectors.ParallelCollectors;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.time.Duration;
|
||||||
|
import java.time.Instant;
|
||||||
|
import java.util.concurrent.Executors;
|
||||||
|
import java.util.function.Supplier;
|
||||||
|
import java.util.stream.Stream;
|
||||||
|
|
||||||
|
import static java.util.stream.Collectors.toList;
|
||||||
|
|
||||||
|
public class ParallelCollectorsVirtualThreadsManualTest {
|
||||||
|
|
||||||
|
private static final Logger log = LoggerFactory.getLogger(ParallelCollectorsVirtualThreadsManualTest.class);
|
||||||
|
|
||||||
|
// increase the number of parallel processes to find the max number of threads on your machine
|
||||||
|
@Test
|
||||||
|
public void givenParallelism_whenUsingOSThreads_thenShouldRunOutOfThreads() {
|
||||||
|
int parallelProcesses = 50_000;
|
||||||
|
|
||||||
|
var e = Executors.newFixedThreadPool(parallelProcesses);
|
||||||
|
|
||||||
|
var result = timed(() -> Stream.iterate(0, i -> i + 1).limit(parallelProcesses)
|
||||||
|
.collect(ParallelCollectors.parallel(i -> fetchById(i), toList(), e, parallelProcesses))
|
||||||
|
.join());
|
||||||
|
|
||||||
|
log.info("{}", result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenParallelism_whenUsingVThreads_thenShouldProcessInParallel() {
|
||||||
|
int parallelProcesses = 1000_000;
|
||||||
|
|
||||||
|
var result = timed(() -> Stream.iterate(0, i -> i + 1).limit(parallelProcesses)
|
||||||
|
.collect(ParallelCollectors.parallel(i -> fetchById(i), toList()))
|
||||||
|
.join());
|
||||||
|
|
||||||
|
log.info("{}", result);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void givenParallelismAndPCollectors2_whenUsingVThreads_thenShouldProcessInParallel() {
|
||||||
|
int parallelProcesses = 1000_000;
|
||||||
|
|
||||||
|
var result = timed(() -> Stream.iterate(0, i -> i + 1).limit(parallelProcesses)
|
||||||
|
.collect(ParallelCollectors.parallel(i -> fetchById(i), toList(), Executors.newVirtualThreadPerTaskExecutor(), Integer.MAX_VALUE))
|
||||||
|
.join());
|
||||||
|
|
||||||
|
log.info("{}", result);
|
||||||
|
}
|
||||||
|
|
||||||
|
private static String fetchById(int id) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// ignore shamelessly
|
||||||
|
}
|
||||||
|
|
||||||
|
return "user-" + id;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static <T> T timed(Supplier<T> supplier) {
|
||||||
|
var before = Instant.now();
|
||||||
|
T result = supplier.get();
|
||||||
|
var after = Instant.now();
|
||||||
|
log.info("Execution time: {} ms", Duration.between(before, after).toMillis());
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
}
|
2
pom.xml
2
pom.xml
|
@ -737,7 +737,7 @@
|
||||||
<module>libraries-security</module>
|
<module>libraries-security</module>
|
||||||
<module>libraries-server-2</module>
|
<module>libraries-server-2</module>
|
||||||
<module>libraries-server</module>
|
<module>libraries-server</module>
|
||||||
<module>libraries-stream</module>
|
<!-- <module>libraries-stream</module> JDK21-->
|
||||||
<module>libraries-testing-2</module>
|
<module>libraries-testing-2</module>
|
||||||
<module>libraries-transform</module>
|
<module>libraries-transform</module>
|
||||||
<module>libraries</module> <!-- very long running -->
|
<module>libraries</module> <!-- very long running -->
|
||||||
|
|
Loading…
Reference in New Issue