diff --git a/reactor-core/pom.xml b/reactor-core/pom.xml index db9550df7b..66c634e113 100644 --- a/reactor-core/pom.xml +++ b/reactor-core/pom.xml @@ -16,7 +16,13 @@ io.projectreactor reactor-core - ${reactor-core.version} + ${reactor.version} + + + io.projectreactor + reactor-test + ${reactor.version} + test org.assertj @@ -24,16 +30,10 @@ ${assertj.version} test - - io.projectreactor - reactor-test - ${reactor-core.version} - test - - 3.1.3.RELEASE + 3.2.6.RELEASE 3.6.1 diff --git a/reactor-core/src/main/java/com/baeldung/reactor/creation/FibonacciState.java b/reactor-core/src/main/java/com/baeldung/reactor/creation/FibonacciState.java new file mode 100644 index 0000000000..291002e1f9 --- /dev/null +++ b/reactor-core/src/main/java/com/baeldung/reactor/creation/FibonacciState.java @@ -0,0 +1,27 @@ +package com.baeldung.reactor.creation; + +public class FibonacciState { + private int former; + private int latter; + + public FibonacciState(int former, int latter) { + this.former = former; + this.latter = latter; + } + + public int getFormer() { + return former; + } + + public void setFormer(int former) { + this.former = former; + } + + public int getLatter() { + return latter; + } + + public void setLatter(int latter) { + this.latter = latter; + } +} diff --git a/reactor-core/src/main/java/com/baeldung/reactor/creation/SequenceCreator.java b/reactor-core/src/main/java/com/baeldung/reactor/creation/SequenceCreator.java new file mode 100644 index 0000000000..fb53b1cab1 --- /dev/null +++ b/reactor-core/src/main/java/com/baeldung/reactor/creation/SequenceCreator.java @@ -0,0 +1,14 @@ +package com.baeldung.reactor.creation; + +import reactor.core.publisher.Flux; + +import java.util.List; +import java.util.function.Consumer; + +public class SequenceCreator { + public Consumer> consumer; + + public Flux createNumberSequence() { + return Flux.create(sink -> SequenceCreator.this.consumer = items -> items.forEach(sink::next)); + } +} diff --git a/reactor-core/src/main/java/com/baeldung/reactor/creation/SequenceGenerator.java b/reactor-core/src/main/java/com/baeldung/reactor/creation/SequenceGenerator.java new file mode 100644 index 0000000000..44d83d06bf --- /dev/null +++ b/reactor-core/src/main/java/com/baeldung/reactor/creation/SequenceGenerator.java @@ -0,0 +1,31 @@ +package com.baeldung.reactor.creation; + +import reactor.core.publisher.Flux; +import reactor.util.function.Tuples; + +public class SequenceGenerator { + public Flux generateFibonacciWithTuples() { + return Flux.generate( + () -> Tuples.of(0, 1), + (state, sink) -> { + sink.next(state.getT1()); + return Tuples.of(state.getT2(), state.getT1() + state.getT2()); + } + ); + } + + public Flux generateFibonacciWithCustomClass(int limit) { + return Flux.generate( + () -> new FibonacciState(0, 1), + (state, sink) -> { + sink.next(state.getFormer()); + if (state.getLatter() > limit) { + sink.complete(); + } + int temp = state.getFormer(); + state.setFormer(state.getLatter()); + state.setLatter(temp + state.getLatter()); + return state; + }); + } +} diff --git a/reactor-core/src/main/java/com/baeldung/reactor/creation/SequenceHandler.java b/reactor-core/src/main/java/com/baeldung/reactor/creation/SequenceHandler.java new file mode 100644 index 0000000000..a9611c63e2 --- /dev/null +++ b/reactor-core/src/main/java/com/baeldung/reactor/creation/SequenceHandler.java @@ -0,0 +1,13 @@ +package com.baeldung.reactor.creation; + +import reactor.core.publisher.Flux; + +public class SequenceHandler { + public Flux handleIntegerSequence(Flux sequence) { + return sequence.handle((number, sink) -> { + if (number % 2 == 0) { + sink.next(number / 2); + } + }); + } +} diff --git a/reactor-core/src/test/java/com/baeldung/reactor/creation/SequenceUnitTest.java b/reactor-core/src/test/java/com/baeldung/reactor/creation/SequenceUnitTest.java new file mode 100644 index 0000000000..78af1d2478 --- /dev/null +++ b/reactor-core/src/test/java/com/baeldung/reactor/creation/SequenceUnitTest.java @@ -0,0 +1,70 @@ +package com.baeldung.reactor.creation; + +import org.junit.Test; +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class SequenceUnitTest { + @Test + public void whenGeneratingNumbersWithTuplesState_thenFibonacciSequenceIsProduced() { + SequenceGenerator sequenceGenerator = new SequenceGenerator(); + Flux fibonacciFlux = sequenceGenerator.generateFibonacciWithTuples().take(5); + + StepVerifier.create(fibonacciFlux) + .expectNext(0, 1, 1, 2, 3) + .expectComplete() + .verify(); + } + + @Test + public void whenGeneratingNumbersWithCustomClass_thenFibonacciSequenceIsProduced() { + SequenceGenerator sequenceGenerator = new SequenceGenerator(); + + StepVerifier.create(sequenceGenerator.generateFibonacciWithCustomClass(10)) + .expectNext(0, 1, 1, 2, 3, 5, 8) + .expectComplete() + .verify(); + } + + @Test + public void whenCreatingNumbers_thenSequenceIsProducedAsynchronously() throws InterruptedException { + SequenceGenerator sequenceGenerator = new SequenceGenerator(); + List sequence1 = sequenceGenerator.generateFibonacciWithTuples().take(3).collectList().block(); + List sequence2 = sequenceGenerator.generateFibonacciWithTuples().take(4).collectList().block(); + + SequenceCreator sequenceCreator = new SequenceCreator(); + Thread producingThread1 = new Thread( + () -> sequenceCreator.consumer.accept(sequence1) + ); + Thread producingThread2 = new Thread( + () -> sequenceCreator.consumer.accept(sequence2) + ); + + List consolidated = new ArrayList<>(); + sequenceCreator.createNumberSequence().subscribe(consolidated::add); + + producingThread1.start(); + producingThread2.start(); + producingThread1.join(); + producingThread2.join(); + + assertThat(consolidated).containsExactlyInAnyOrder(0, 1, 1, 0, 1, 1, 2); + } + + @Test + public void whenHandlingNumbers_thenSequenceIsMappedAndFiltered() { + SequenceHandler sequenceHandler = new SequenceHandler(); + SequenceGenerator sequenceGenerator = new SequenceGenerator(); + Flux sequence = sequenceGenerator.generateFibonacciWithTuples().take(10); + + StepVerifier.create(sequenceHandler.handleIntegerSequence(sequence)) + .expectNext(0, 1, 4, 17) + .expectComplete() + .verify(); + } +}