BAEL-5924 Java 8 Stream with Batch Processing Support (#13203)

This commit is contained in:
alemoles 2023-01-18 09:48:57 -03:00 committed by GitHub
parent a4f6d89cc7
commit de6d2bc25c
4 changed files with 254 additions and 0 deletions

View File

@ -59,6 +59,36 @@
<version>3.12.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>${rx.java3.version}</version>
</dependency>
<dependency>
<groupId>io.vavr</groupId>
<artifactId>vavr</artifactId>
<version>${io.varv.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${io.reactor3.version}</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>${apache.commons.collection4.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${google.guava.version}</version>
</dependency>
<dependency>
<groupId>com.oath.cyclops</groupId>
<artifactId>cyclops</artifactId>
<version>${cyclops.version}</version>
</dependency>
</dependencies>
<build>
@ -90,6 +120,12 @@
<maven.compiler.target>12</maven.compiler.target>
<rx.java.version>1.2.5</rx.java.version>
<rx.java2.version>2.2.2</rx.java2.version>
<rx.java3.version>3.1.5</rx.java3.version>
<io.varv.version>1.0.0-alpha-4</io.varv.version>
<io.reactor3.version>3.5.1</io.reactor3.version>
<apache.commons.collection4.version>4.4</apache.commons.collection4.version>
<google.guava.version>31.1-jre</google.guava.version>
<cyclops.version>10.4.1</cyclops.version>
</properties>
</project>

View File

@ -0,0 +1,47 @@
package com.baeldung.streams.processing;
import static java.util.Spliterator.ORDERED;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Spliterators;
import java.util.stream.Stream;
import java.util.stream.StreamSupport;
public class CustomBatchIterator<T> implements Iterator<List<T>> {
private final int batchSize;
private List<T> currentBatch;
private final Iterator<T> iterator;
public CustomBatchIterator(Iterator<T> sourceIterator, int batchSize) {
this.batchSize = batchSize;
this.iterator = sourceIterator;
}
@Override
public List<T> next() {
return currentBatch;
}
@Override
public boolean hasNext() {
prepareNextBatch();
return currentBatch != null && !currentBatch.isEmpty();
}
public static <T> Stream<List<T>> batchStreamOf(Stream<T> stream, int batchSize) {
return stream(new CustomBatchIterator<>(stream.iterator(), batchSize));
}
private static <T> Stream<T> stream(Iterator<T> iterator) {
return StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, ORDERED), false);
}
private void prepareNextBatch() {
currentBatch = new ArrayList<>(batchSize);
while (iterator.hasNext() && currentBatch.size() < batchSize) {
currentBatch.add(iterator.next());
}
}
}

View File

@ -0,0 +1,141 @@
package com.baeldung.streams.processing;
import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.commons.collections4.ListUtils;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import com.google.common.collect.Iterators;
import cyclops.data.LazySeq;
import cyclops.reactive.ReactiveSeq;
import io.reactivex.rxjava3.core.Observable;
import reactor.core.publisher.Flux;
public class StreamProcessingUnitTest {
public final int BATCH_SIZE = 10;
private final List<Integer> firstBatch = List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
private final List<Integer> secondBatch = List.of(10, 11, 12, 13, 14, 15, 16, 17, 18, 19);
private final List<Integer> thirdBatch = List.of(20, 21, 22, 23, 24, 25, 26, 27, 28, 29);
private final List<Integer> fourthBatch = List.of(30, 31, 32, 33);
public Stream<Integer> data;
@BeforeEach
public void setUp() {
data = IntStream.range(0, 34)
.boxed();
}
@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingSpliterator_thenFourBatchesAreObtained() {
Collection<List<Integer>> result = new ArrayList<>();
CustomBatchIterator.batchStreamOf(data, BATCH_SIZE)
.forEach(result::add);
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingCollectionAPI_thenFourBatchesAreObtained() {
Collection<List<Integer>> result = data.collect(Collectors.groupingBy(it -> it / BATCH_SIZE))
.values();
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
@Test
public void givenAStreamOfData_whenIsProcessingInBatchParallelUsingCollectionAPI_thenFourBatchesAreObtained() {
Collection<List<Integer>> result = data.parallel()
.collect(Collectors.groupingBy(it -> it / BATCH_SIZE))
.values();
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingRxJavaV3_thenFourBatchesAreObtained() {
// RxJava v3
Collection<List<Integer>> result = new ArrayList<>();
Observable.fromStream(data)
.buffer(BATCH_SIZE)
.subscribe(result::add);
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingReactor_thenFourBatchesAreObtained() {
Collection<List<Integer>> result = new ArrayList<>();
Flux.fromStream(data)
.buffer(BATCH_SIZE)
.subscribe(result::add);
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingApacheCommon_thenFourBatchesAreObtained() {
Collection<List<Integer>> result = new ArrayList<>(ListUtils.partition(data.collect(Collectors.toList()), BATCH_SIZE));
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingGuava_thenFourBatchesAreObtained() {
Collection<List<Integer>> result = new ArrayList<>();
Iterators.partition(data.iterator(), BATCH_SIZE)
.forEachRemaining(result::add);
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingCyclops_thenFourBatchesAreObtained() {
Collection<List<Integer>> result = new ArrayList<>();
ReactiveSeq.fromStream(data)
.grouped(BATCH_SIZE)
.toList()
.forEach(value -> result.add(value.collect(Collectors.toList())));
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingCyclopsLazy_thenFourBatchesAreObtained() {
Collection<List<Integer>> result = new ArrayList<>();
LazySeq.fromStream(data)
.grouped(BATCH_SIZE)
.toList()
.forEach(value -> result.add(value.collect(Collectors.toList())));
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
}

View File

@ -0,0 +1,30 @@
package com.baeldung.streams.processing.vavr;
import static org.junit.jupiter.api.Assertions.assertTrue;
import org.junit.jupiter.api.Test;
import com.baeldung.streams.processing.StreamProcessingUnitTest;
import io.vavr.collection.List;
import io.vavr.collection.Stream;
public class StreamProcessingWithVavrUnitTest extends StreamProcessingUnitTest {
private final List<Integer> firstBatch = List.of(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
private final List<Integer> secondBatch = List.of(10, 11, 12, 13, 14, 15, 16, 17, 18, 19);
private final List<Integer> thirdBatch = List.of(20, 21, 22, 23, 24, 25, 26, 27, 28, 29);
private final List<Integer> fourthBatch = List.of(30, 31, 32, 33);
@Test
public void givenAStreamOfData_whenIsProcessingInBatchUsingVavr_thenFourBatchesAreObtained() {
List<List<Integer>> result = Stream.ofAll(data)
.toList()
.grouped(BATCH_SIZE)
.toList();
assertTrue(result.contains(firstBatch));
assertTrue(result.contains(secondBatch));
assertTrue(result.contains(thirdBatch));
assertTrue(result.contains(fourthBatch));
}
}