From 5aea5d5c296ed1471436de74fb57a603ab98dd44 Mon Sep 17 00:00:00 2001 From: etrandafir93 <75391049+etrandafir93@users.noreply.github.com> Date: Sat, 7 Oct 2023 06:11:47 +0200 Subject: [PATCH] BAEL-6782: partitioning streams (#14833) * BAEL-6782: partitioning streams * BAEL-6782: code reivew * BAEL-6782: line continuations --- core-java-modules/core-java-streams-5/pom.xml | 6 ++ .../streams/partitioning/PartitionStream.java | 90 +++++++++++++++++++ .../PartitionStreamsUnitTest.java | 82 +++++++++++++++++ 3 files changed, 178 insertions(+) create mode 100644 core-java-modules/core-java-streams-5/src/main/java/com/baeldung/streams/partitioning/PartitionStream.java create mode 100644 core-java-modules/core-java-streams-5/src/test/java/com/baeldung/partitioning/PartitionStreamsUnitTest.java diff --git a/core-java-modules/core-java-streams-5/pom.xml b/core-java-modules/core-java-streams-5/pom.xml index dc97d81b3d..d7baf84d30 100644 --- a/core-java-modules/core-java-streams-5/pom.xml +++ b/core-java-modules/core-java-streams-5/pom.xml @@ -43,6 +43,11 @@ vavr ${vavr.version} + + com.google.guava + guava + ${guava.version} + @@ -72,6 +77,7 @@ 12 12 0.10.2 + 32.1.2-jre \ No newline at end of file diff --git a/core-java-modules/core-java-streams-5/src/main/java/com/baeldung/streams/partitioning/PartitionStream.java b/core-java-modules/core-java-streams-5/src/main/java/com/baeldung/streams/partitioning/PartitionStream.java new file mode 100644 index 0000000000..1ef3fa7707 --- /dev/null +++ b/core-java-modules/core-java-streams-5/src/main/java/com/baeldung/streams/partitioning/PartitionStream.java @@ -0,0 +1,90 @@ +package com.baeldung.streams.partitioning; + +import java.util.ArrayList; +import java.util.List; +import java.util.function.BiConsumer; +import java.util.function.BinaryOperator; +import java.util.function.Function; +import java.util.function.Supplier; +import java.util.stream.Collector; +import java.util.stream.Collectors; +import java.util.stream.IntStream; +import java.util.stream.Stream; + +import com.google.common.collect.Iterables; + +public class PartitionStream { + + public static Stream> partitionList(List source, int batchSize) { + if (batchSize <= 0) { + throw new IllegalArgumentException(String.format("Expected the batchSize to be greater than ZERO, actual value was: %s", batchSize)); + } + if (source.isEmpty()) { + return Stream.empty(); + } + int nrOfFullBatches = (source.size() - 1) / batchSize; + return IntStream.rangeClosed(0, nrOfFullBatches) + .mapToObj(batch -> { + int startIndex = batch * batchSize; + int endIndex = (batch == nrOfFullBatches) ? source.size() : (batch + 1) * batchSize; + return source.subList(startIndex, endIndex); + }); + } + + public static Iterable> partitionUsingGuava(Stream source, int batchSize) { + return Iterables.partition(source::iterator, batchSize); + } + + public static List> partitionStream(Stream source, int batchSize) { + return source.collect(partitionBySize(batchSize, Collectors.toList())); + } + + public static Collector partitionBySize(int batchSize, Collector, A, R> downstream) { + Supplier> supplier = () -> new Accumulator<>( + batchSize, + downstream.supplier().get(), + downstream.accumulator()::accept + ); + + BiConsumer, T> accumulator = (acc, value) -> acc.add(value); + + BinaryOperator> combiner = (acc1, acc2) -> acc1.combine(acc2, downstream.combiner()); + + Function, R> finisher = acc -> { + if (!acc.values.isEmpty()) { + downstream.accumulator().accept(acc.downstreamAccumulator, acc.values); + } + return downstream.finisher().apply(acc.downstreamAccumulator); + }; + + return Collector.of(supplier, accumulator, combiner, finisher, Collector.Characteristics.UNORDERED); + } + + static class Accumulator { + private final List values = new ArrayList<>(); + private final int batchSize; + private A downstreamAccumulator; + private final BiConsumer> batchFullListener; + + Accumulator(int batchSize, A accumulator, BiConsumer> onBatchFull) { + this.batchSize = batchSize; + this.downstreamAccumulator = accumulator; + this.batchFullListener = onBatchFull; + } + + void add(T value) { + values.add(value); + if (values.size() == batchSize) { + batchFullListener.accept(downstreamAccumulator, new ArrayList<>(values)); + values.clear(); + } + } + + Accumulator combine(Accumulator other, BinaryOperator accumulatorCombiner) { + this.downstreamAccumulator = accumulatorCombiner.apply(downstreamAccumulator, other.downstreamAccumulator); + other.values.forEach(this::add); + return this; + } + } + +} diff --git a/core-java-modules/core-java-streams-5/src/test/java/com/baeldung/partitioning/PartitionStreamsUnitTest.java b/core-java-modules/core-java-streams-5/src/test/java/com/baeldung/partitioning/PartitionStreamsUnitTest.java new file mode 100644 index 0000000000..df75e69783 --- /dev/null +++ b/core-java-modules/core-java-streams-5/src/test/java/com/baeldung/partitioning/PartitionStreamsUnitTest.java @@ -0,0 +1,82 @@ +package com.baeldung.partitioning; + +import static com.baeldung.streams.partitioning.PartitionStream.partitionList; +import static com.baeldung.streams.partitioning.PartitionStream.partitionStream; +import static com.baeldung.streams.partitioning.PartitionStream.partitionUsingGuava; +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.Assertions.atIndex; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.stream.Stream; + +import org.junit.jupiter.api.Test; + +public class PartitionStreamsUnitTest { + + @Test + void whenPartitionList_thenReturnThreeSubLists() { + List source = List.of(1, 2, 3, 4, 5, 6, 7, 8); + + Stream> result = partitionList(source, 3); + + assertThat(result) + .containsExactlyInAnyOrder( + List.of(1, 2, 3), + List.of(4, 5, 6), + List.of(7, 8) + ); + } + + @Test + void whenPartitionEmptyList_thenReturnEmptyStream() { + Stream> result = partitionList(Collections.emptyList(), 3); + + assertThat(result).isEmpty(); + } + + @Test + void whenPartitionListWithNegativeBatchSize_thenThrowException() { + assertThatThrownBy(() -> partitionList(List.of(1,2,3), -1)) + .isInstanceOf(IllegalArgumentException.class) + .hasMessage("Expected the batchSize to be greater than ZERO, actual value was: -1"); + } + + @Test + void whenPartitionParallelStream_thenReturnThreeSubLists() { + Stream source = Stream.of(1, 2, 3, 4, 5, 6, 7, 8).parallel(); + + List> result = partitionStream(source, 3); + + assertThat(result) + .hasSize(3) + .satisfies(batch -> assertThat(batch).hasSize(3), atIndex(0)) + .satisfies(batch -> assertThat(batch).hasSize(3), atIndex(1)) + .satisfies(batch -> assertThat(batch).hasSize(2), atIndex(2)); + } + + @Test + void whenPartitionEmptyParallelStream_thenReturnEmptyList() { + Stream source = Stream.empty().parallel(); + + List> result = partitionStream(source, 3); + + assertThat(result).isEmpty(); + } + + @Test + void whenPartitionParallelStreamWithGuava_thenReturnThreeSubLists() { + Stream source = Stream.of(1, 2, 3, 4, 5, 6, 7, 8).parallel(); + + Iterable> result = partitionUsingGuava(source, 3); + + assertThat(result) + .map(ArrayList::new) + .hasSize(3) + .satisfies(batch -> assertThat(batch).asList().hasSize(3), atIndex(0)) + .satisfies(batch -> assertThat(batch).asList().hasSize(3), atIndex(1)) + .satisfies(batch -> assertThat(batch).asList().hasSize(2), atIndex(2)); + } +}