diff --git a/reactor-core/pom.xml b/reactor-core/pom.xml index 39a66cee3e..e27a1a2845 100644 --- a/reactor-core/pom.xml +++ b/reactor-core/pom.xml @@ -35,7 +35,7 @@ - 3.4.9 + 3.4.17 \ No newline at end of file diff --git a/reactor-core/src/main/java/com/baeldung/reactor/generate/create/CharacterCreator.java b/reactor-core/src/main/java/com/baeldung/reactor/generate/create/CharacterCreator.java new file mode 100644 index 0000000000..122e512e78 --- /dev/null +++ b/reactor-core/src/main/java/com/baeldung/reactor/generate/create/CharacterCreator.java @@ -0,0 +1,14 @@ +package com.baeldung.reactor.generate.create; + +import java.util.List; +import java.util.function.Consumer; + +import reactor.core.publisher.Flux; + +public class CharacterCreator { + public Consumer> consumer; + + public Flux createCharacterSequence() { + return Flux.create(sink -> CharacterCreator.this.consumer = items -> items.forEach(sink::next)); + } +} diff --git a/reactor-core/src/main/java/com/baeldung/reactor/generate/create/CharacterGenerator.java b/reactor-core/src/main/java/com/baeldung/reactor/generate/create/CharacterGenerator.java new file mode 100644 index 0000000000..7951483dd9 --- /dev/null +++ b/reactor-core/src/main/java/com/baeldung/reactor/generate/create/CharacterGenerator.java @@ -0,0 +1,16 @@ +package com.baeldung.reactor.generate.create; + +import reactor.core.publisher.Flux; + +public class CharacterGenerator { + public Flux generateCharacters() { + return Flux.generate(() -> 97, (state, sink) -> { + char value = (char) state.intValue(); + sink.next(value); + if (value == 'z') { + sink.complete(); + } + return state + 1; + }); + } +} diff --git a/reactor-core/src/test/java/com/baeldung/reactor/generate/create/CharacterUnitTest.java b/reactor-core/src/test/java/com/baeldung/reactor/generate/create/CharacterUnitTest.java new file mode 100644 index 0000000000..6822b25a7f --- /dev/null +++ b/reactor-core/src/test/java/com/baeldung/reactor/generate/create/CharacterUnitTest.java @@ -0,0 +1,52 @@ +package com.baeldung.reactor.generate.create; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; + +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +public class CharacterUnitTest { + @Test + public void whenGeneratingCharacters_thenCharactersAreProduced() { + CharacterGenerator characterGenerator = new CharacterGenerator(); + Flux characterFlux = characterGenerator.generateCharacters().take(3); + + StepVerifier.create(characterFlux) + .expectNext('a', 'b', 'c') + .expectComplete() + .verify(); + } + + @Test + public void whenCreatingCharactersWithMultipleThreads_thenSequenceIsProducedAsynchronously() throws InterruptedException { + CharacterGenerator characterGenerator = new CharacterGenerator(); + List sequence1 = characterGenerator.generateCharacters() + .take(3) + .collectList() + .block(); + List sequence2 = characterGenerator.generateCharacters() + .take(2) + .collectList() + .block(); + + CharacterCreator characterCreator = new CharacterCreator(); + Thread producerThread1 = new Thread( + () -> characterCreator.consumer.accept(sequence1)); + Thread producerThread2 = new Thread( + () -> characterCreator.consumer.accept(sequence2)); + List consolidated = new ArrayList<>(); + characterCreator.createCharacterSequence().subscribe(consolidated::add); + + producerThread1.start(); + producerThread2.start(); + producerThread1.join(); + producerThread2.join(); + + assertThat(consolidated).containsExactlyInAnyOrder('a', 'b', 'c', 'a', 'b'); + } +}