BAEL-6782: partitioning streams (#14833)

* BAEL-6782: partitioning streams

* BAEL-6782: code reivew

* BAEL-6782: line continuations
This commit is contained in:
etrandafir93 2023-10-07 06:11:47 +02:00 committed by GitHub
parent 5dcd7e88d0
commit 5aea5d5c29
3 changed files with 178 additions and 0 deletions

View File

@ -43,6 +43,11 @@
<artifactId>vavr</artifactId>
<version>${vavr.version}</version>
</dependency>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
</dependency>
</dependencies>
<build>
@ -72,6 +77,7 @@
<maven.compiler.source>12</maven.compiler.source>
<maven.compiler.target>12</maven.compiler.target>
<vavr.version>0.10.2</vavr.version>
<guava.version>32.1.2-jre</guava.version>
</properties>
</project>

View File

@ -0,0 +1,90 @@
package com.baeldung.streams.partitioning;
import java.util.ArrayList;
import java.util.List;
import java.util.function.BiConsumer;
import java.util.function.BinaryOperator;
import java.util.function.Function;
import java.util.function.Supplier;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
import com.google.common.collect.Iterables;
public class PartitionStream {
public static <T> Stream<List<T>> partitionList(List<T> source, int batchSize) {
if (batchSize <= 0) {
throw new IllegalArgumentException(String.format("Expected the batchSize to be greater than ZERO, actual value was: %s", batchSize));
}
if (source.isEmpty()) {
return Stream.empty();
}
int nrOfFullBatches = (source.size() - 1) / batchSize;
return IntStream.rangeClosed(0, nrOfFullBatches)
.mapToObj(batch -> {
int startIndex = batch * batchSize;
int endIndex = (batch == nrOfFullBatches) ? source.size() : (batch + 1) * batchSize;
return source.subList(startIndex, endIndex);
});
}
public static <T> Iterable<List<T>> partitionUsingGuava(Stream<T> source, int batchSize) {
return Iterables.partition(source::iterator, batchSize);
}
public static <T> List<List<T>> partitionStream(Stream<T> source, int batchSize) {
return source.collect(partitionBySize(batchSize, Collectors.toList()));
}
public static <T, A, R> Collector<T, ?, R> partitionBySize(int batchSize, Collector<List<T>, A, R> downstream) {
Supplier<Accumulator<T, A>> supplier = () -> new Accumulator<>(
batchSize,
downstream.supplier().get(),
downstream.accumulator()::accept
);
BiConsumer<Accumulator<T, A>, T> accumulator = (acc, value) -> acc.add(value);
BinaryOperator<Accumulator<T, A>> combiner = (acc1, acc2) -> acc1.combine(acc2, downstream.combiner());
Function<Accumulator<T, A>, R> finisher = acc -> {
if (!acc.values.isEmpty()) {
downstream.accumulator().accept(acc.downstreamAccumulator, acc.values);
}
return downstream.finisher().apply(acc.downstreamAccumulator);
};
return Collector.of(supplier, accumulator, combiner, finisher, Collector.Characteristics.UNORDERED);
}
static class Accumulator<T, A> {
private final List<T> values = new ArrayList<>();
private final int batchSize;
private A downstreamAccumulator;
private final BiConsumer<A, List<T>> batchFullListener;
Accumulator(int batchSize, A accumulator, BiConsumer<A, List<T>> onBatchFull) {
this.batchSize = batchSize;
this.downstreamAccumulator = accumulator;
this.batchFullListener = onBatchFull;
}
void add(T value) {
values.add(value);
if (values.size() == batchSize) {
batchFullListener.accept(downstreamAccumulator, new ArrayList<>(values));
values.clear();
}
}
Accumulator<T, A> combine(Accumulator<T, A> other, BinaryOperator<A> accumulatorCombiner) {
this.downstreamAccumulator = accumulatorCombiner.apply(downstreamAccumulator, other.downstreamAccumulator);
other.values.forEach(this::add);
return this;
}
}
}

View File

@ -0,0 +1,82 @@
package com.baeldung.partitioning;
import static com.baeldung.streams.partitioning.PartitionStream.partitionList;
import static com.baeldung.streams.partitioning.PartitionStream.partitionStream;
import static com.baeldung.streams.partitioning.PartitionStream.partitionUsingGuava;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.assertj.core.api.Assertions.atIndex;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.stream.Stream;
import org.junit.jupiter.api.Test;
public class PartitionStreamsUnitTest {
@Test
void whenPartitionList_thenReturnThreeSubLists() {
List<Integer> source = List.of(1, 2, 3, 4, 5, 6, 7, 8);
Stream<List<Integer>> result = partitionList(source, 3);
assertThat(result)
.containsExactlyInAnyOrder(
List.of(1, 2, 3),
List.of(4, 5, 6),
List.of(7, 8)
);
}
@Test
void whenPartitionEmptyList_thenReturnEmptyStream() {
Stream<List<Integer>> result = partitionList(Collections.emptyList(), 3);
assertThat(result).isEmpty();
}
@Test
void whenPartitionListWithNegativeBatchSize_thenThrowException() {
assertThatThrownBy(() -> partitionList(List.of(1,2,3), -1))
.isInstanceOf(IllegalArgumentException.class)
.hasMessage("Expected the batchSize to be greater than ZERO, actual value was: -1");
}
@Test
void whenPartitionParallelStream_thenReturnThreeSubLists() {
Stream<Integer> source = Stream.of(1, 2, 3, 4, 5, 6, 7, 8).parallel();
List<List<Integer>> result = partitionStream(source, 3);
assertThat(result)
.hasSize(3)
.satisfies(batch -> assertThat(batch).hasSize(3), atIndex(0))
.satisfies(batch -> assertThat(batch).hasSize(3), atIndex(1))
.satisfies(batch -> assertThat(batch).hasSize(2), atIndex(2));
}
@Test
void whenPartitionEmptyParallelStream_thenReturnEmptyList() {
Stream<Integer> source = Stream.<Integer>empty().parallel();
List<List<Integer>> result = partitionStream(source, 3);
assertThat(result).isEmpty();
}
@Test
void whenPartitionParallelStreamWithGuava_thenReturnThreeSubLists() {
Stream<Integer> source = Stream.of(1, 2, 3, 4, 5, 6, 7, 8).parallel();
Iterable<List<Integer>> result = partitionUsingGuava(source, 3);
assertThat(result)
.map(ArrayList::new)
.hasSize(3)
.satisfies(batch -> assertThat(batch).asList().hasSize(3), atIndex(0))
.satisfies(batch -> assertThat(batch).asList().hasSize(3), atIndex(1))
.satisfies(batch -> assertThat(batch).asList().hasSize(2), atIndex(2));
}
}