From de6d2bc25cabd418b944cf7498d8dcffe796253b Mon Sep 17 00:00:00 2001 From: alemoles Date: Wed, 18 Jan 2023 09:48:57 -0300 Subject: [PATCH] BAEL-5924 Java 8 Stream with Batch Processing Support (#13203) --- core-java-modules/core-java-streams-4/pom.xml | 36 +++++ .../processing/CustomBatchIterator.java | 47 ++++++ .../processing/StreamProcessingUnitTest.java | 141 ++++++++++++++++++ .../StreamProcessingWithVavrUnitTest.java | 30 ++++ 4 files changed, 254 insertions(+) create mode 100644 core-java-modules/core-java-streams-4/src/main/java/com/baeldung/streams/processing/CustomBatchIterator.java create mode 100644 core-java-modules/core-java-streams-4/src/test/java/com/baeldung/streams/processing/StreamProcessingUnitTest.java create mode 100644 core-java-modules/core-java-streams-4/src/test/java/com/baeldung/streams/processing/vavr/StreamProcessingWithVavrUnitTest.java diff --git a/core-java-modules/core-java-streams-4/pom.xml b/core-java-modules/core-java-streams-4/pom.xml index ed4603796d..46c0f3f7e1 100644 --- a/core-java-modules/core-java-streams-4/pom.xml +++ b/core-java-modules/core-java-streams-4/pom.xml @@ -59,6 +59,36 @@ 3.12.0 test + + io.reactivex.rxjava3 + rxjava + ${rx.java3.version} + + + io.vavr + vavr + ${io.varv.version} + + + io.projectreactor + reactor-core + ${io.reactor3.version} + + + org.apache.commons + commons-collections4 + ${apache.commons.collection4.version} + + + com.google.guava + guava + ${google.guava.version} + + + com.oath.cyclops + cyclops + ${cyclops.version} + @@ -90,6 +120,12 @@ 12 1.2.5 2.2.2 + 3.1.5 + 1.0.0-alpha-4 + 3.5.1 + 4.4 + 31.1-jre + 10.4.1 \ No newline at end of file diff --git a/core-java-modules/core-java-streams-4/src/main/java/com/baeldung/streams/processing/CustomBatchIterator.java b/core-java-modules/core-java-streams-4/src/main/java/com/baeldung/streams/processing/CustomBatchIterator.java new file mode 100644 index 0000000000..b5407b7283 --- /dev/null +++ b/core-java-modules/core-java-streams-4/src/main/java/com/baeldung/streams/processing/CustomBatchIterator.java @@ -0,0 +1,47 @@ +package com.baeldung.streams.processing; + +import static java.util.Spliterator.ORDERED; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Spliterators; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +public class CustomBatchIterator implements Iterator> { + private final int batchSize; + private List currentBatch; + private final Iterator iterator; + + public CustomBatchIterator(Iterator sourceIterator, int batchSize) { + this.batchSize = batchSize; + this.iterator = sourceIterator; + } + + @Override + public List next() { + return currentBatch; + } + + @Override + public boolean hasNext() { + prepareNextBatch(); + return currentBatch != null && !currentBatch.isEmpty(); + } + + public static Stream> batchStreamOf(Stream stream, int batchSize) { + return stream(new CustomBatchIterator<>(stream.iterator(), batchSize)); + } + + private static Stream stream(Iterator iterator) { + return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, ORDERED), false); + } + + private void prepareNextBatch() { + currentBatch = new ArrayList<>(batchSize); + while (iterator.hasNext() && currentBatch.size() < batchSize) { + currentBatch.add(iterator.next()); + } + } +} \ No newline at end of file diff --git a/core-java-modules/core-java-streams-4/src/test/java/com/baeldung/streams/processing/StreamProcessingUnitTest.java b/core-java-modules/core-java-streams-4/src/test/java/com/baeldung/streams/processing/StreamProcessingUnitTest.java new file mode 100644 index 0000000000..f8f88387d5 --- /dev/null +++ b/core-java-modules/core-java-streams-4/src/test/java/com/baeldung/streams/processing/StreamProcessingUnitTest.java @@ -0,0 +1,141 @@ +package com.baeldung.streams.processing; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import org.apache.commons.collections4.ListUtils; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import com.google.common.collect.Iterators; + +import cyclops.data.LazySeq; +import cyclops.reactive.ReactiveSeq; +import io.reactivex.rxjava3.core.Observable; +import reactor.core.publisher.Flux; + +public class StreamProcessingUnitTest { + public final int BATCH_SIZE = 10; + + private final List firstBatch = List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + private final List secondBatch = List.of(10, 11, 12, 13, 14, 15, 16, 17, 18, 19); + private final List thirdBatch = List.of(20, 21, 22, 23, 24, 25, 26, 27, 28, 29); + private final List fourthBatch = List.of(30, 31, 32, 33); + + public Stream data; + + @BeforeEach + public void setUp() { + data = IntStream.range(0, 34) + .boxed(); + } + + @Test + public void givenAStreamOfData_whenIsProcessingInBatchUsingSpliterator_thenFourBatchesAreObtained() { + Collection> result = new ArrayList<>(); + CustomBatchIterator.batchStreamOf(data, BATCH_SIZE) + .forEach(result::add); + assertTrue(result.contains(firstBatch)); + assertTrue(result.contains(secondBatch)); + assertTrue(result.contains(thirdBatch)); + assertTrue(result.contains(fourthBatch)); + } + + @Test + public void givenAStreamOfData_whenIsProcessingInBatchUsingCollectionAPI_thenFourBatchesAreObtained() { + Collection> result = data.collect(Collectors.groupingBy(it -> it / BATCH_SIZE)) + .values(); + assertTrue(result.contains(firstBatch)); + assertTrue(result.contains(secondBatch)); + assertTrue(result.contains(thirdBatch)); + assertTrue(result.contains(fourthBatch)); + } + + @Test + public void givenAStreamOfData_whenIsProcessingInBatchParallelUsingCollectionAPI_thenFourBatchesAreObtained() { + Collection> result = data.parallel() + .collect(Collectors.groupingBy(it -> it / BATCH_SIZE)) + .values(); + assertTrue(result.contains(firstBatch)); + assertTrue(result.contains(secondBatch)); + assertTrue(result.contains(thirdBatch)); + assertTrue(result.contains(fourthBatch)); + } + + @Test + public void givenAStreamOfData_whenIsProcessingInBatchUsingRxJavaV3_thenFourBatchesAreObtained() { + // RxJava v3 + Collection> result = new ArrayList<>(); + Observable.fromStream(data) + .buffer(BATCH_SIZE) + .subscribe(result::add); + assertTrue(result.contains(firstBatch)); + assertTrue(result.contains(secondBatch)); + assertTrue(result.contains(thirdBatch)); + assertTrue(result.contains(fourthBatch)); + } + + @Test + public void givenAStreamOfData_whenIsProcessingInBatchUsingReactor_thenFourBatchesAreObtained() { + Collection> result = new ArrayList<>(); + Flux.fromStream(data) + .buffer(BATCH_SIZE) + .subscribe(result::add); + assertTrue(result.contains(firstBatch)); + assertTrue(result.contains(secondBatch)); + assertTrue(result.contains(thirdBatch)); + assertTrue(result.contains(fourthBatch)); + } + + @Test + public void givenAStreamOfData_whenIsProcessingInBatchUsingApacheCommon_thenFourBatchesAreObtained() { + Collection> result = new ArrayList<>(ListUtils.partition(data.collect(Collectors.toList()), BATCH_SIZE)); + assertTrue(result.contains(firstBatch)); + assertTrue(result.contains(secondBatch)); + assertTrue(result.contains(thirdBatch)); + assertTrue(result.contains(fourthBatch)); + } + + @Test + public void givenAStreamOfData_whenIsProcessingInBatchUsingGuava_thenFourBatchesAreObtained() { + Collection> result = new ArrayList<>(); + Iterators.partition(data.iterator(), BATCH_SIZE) + .forEachRemaining(result::add); + assertTrue(result.contains(firstBatch)); + assertTrue(result.contains(secondBatch)); + assertTrue(result.contains(thirdBatch)); + assertTrue(result.contains(fourthBatch)); + } + + @Test + public void givenAStreamOfData_whenIsProcessingInBatchUsingCyclops_thenFourBatchesAreObtained() { + Collection> result = new ArrayList<>(); + ReactiveSeq.fromStream(data) + .grouped(BATCH_SIZE) + .toList() + .forEach(value -> result.add(value.collect(Collectors.toList()))); + assertTrue(result.contains(firstBatch)); + assertTrue(result.contains(secondBatch)); + assertTrue(result.contains(thirdBatch)); + assertTrue(result.contains(fourthBatch)); + } + + @Test + public void givenAStreamOfData_whenIsProcessingInBatchUsingCyclopsLazy_thenFourBatchesAreObtained() { + Collection> result = new ArrayList<>(); + LazySeq.fromStream(data) + .grouped(BATCH_SIZE) + .toList() + .forEach(value -> result.add(value.collect(Collectors.toList()))); + assertTrue(result.contains(firstBatch)); + assertTrue(result.contains(secondBatch)); + assertTrue(result.contains(thirdBatch)); + assertTrue(result.contains(fourthBatch)); + } +} \ No newline at end of file diff --git a/core-java-modules/core-java-streams-4/src/test/java/com/baeldung/streams/processing/vavr/StreamProcessingWithVavrUnitTest.java b/core-java-modules/core-java-streams-4/src/test/java/com/baeldung/streams/processing/vavr/StreamProcessingWithVavrUnitTest.java new file mode 100644 index 0000000000..859b059889 --- /dev/null +++ b/core-java-modules/core-java-streams-4/src/test/java/com/baeldung/streams/processing/vavr/StreamProcessingWithVavrUnitTest.java @@ -0,0 +1,30 @@ +package com.baeldung.streams.processing.vavr; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +import org.junit.jupiter.api.Test; + +import com.baeldung.streams.processing.StreamProcessingUnitTest; + +import io.vavr.collection.List; +import io.vavr.collection.Stream; + +public class StreamProcessingWithVavrUnitTest extends StreamProcessingUnitTest { + + private final List firstBatch = List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + private final List secondBatch = List.of(10, 11, 12, 13, 14, 15, 16, 17, 18, 19); + private final List thirdBatch = List.of(20, 21, 22, 23, 24, 25, 26, 27, 28, 29); + private final List fourthBatch = List.of(30, 31, 32, 33); + + @Test + public void givenAStreamOfData_whenIsProcessingInBatchUsingVavr_thenFourBatchesAreObtained() { + List> result = Stream.ofAll(data) + .toList() + .grouped(BATCH_SIZE) + .toList(); + assertTrue(result.contains(firstBatch)); + assertTrue(result.contains(secondBatch)); + assertTrue(result.contains(thirdBatch)); + assertTrue(result.contains(fourthBatch)); + } +}