From 859e4bac776d7b522549c2d4a8b452f5179ce1d6 Mon Sep 17 00:00:00 2001 From: Somnath Musib <7885767+musibs@users.noreply.github.com> Date: Sun, 10 Apr 2022 08:12:09 +0530 Subject: [PATCH 1/5] BAEL-4886 Flux generate vs create --- .../reactor/creation/CharacterCreator.java | 15 ++++++ .../reactor/creation/CharacterGenerator.java | 18 +++++++ .../reactor/creation/CharacterUnitTest.java | 50 +++++++++++++++++++ 3 files changed, 83 insertions(+) create mode 100644 reactor-core/src/main/java/com/baeldung/reactor/creation/CharacterCreator.java create mode 100644 reactor-core/src/main/java/com/baeldung/reactor/creation/CharacterGenerator.java create mode 100644 reactor-core/src/test/java/com/baeldung/reactor/creation/CharacterUnitTest.java diff --git a/reactor-core/src/main/java/com/baeldung/reactor/creation/CharacterCreator.java b/reactor-core/src/main/java/com/baeldung/reactor/creation/CharacterCreator.java new file mode 100644 index 0000000000..382615b01c --- /dev/null +++ b/reactor-core/src/main/java/com/baeldung/reactor/creation/CharacterCreator.java @@ -0,0 +1,15 @@ +package com.baeldung.reactor.creation; + +import java.util.List; +import java.util.function.Consumer; + +import reactor.core.publisher.Flux; + +public class CharacterCreator { + + public Consumer> consumer; + + public Flux createCharacterSequence() { + return Flux.create(sink -> CharacterCreator.this.consumer = items -> items.forEach(sink::next)); + } +} diff --git a/reactor-core/src/main/java/com/baeldung/reactor/creation/CharacterGenerator.java b/reactor-core/src/main/java/com/baeldung/reactor/creation/CharacterGenerator.java new file mode 100644 index 0000000000..a22ea02bba --- /dev/null +++ b/reactor-core/src/main/java/com/baeldung/reactor/creation/CharacterGenerator.java @@ -0,0 +1,18 @@ +package com.baeldung.reactor.creation; + +import reactor.core.publisher.Flux; + +public class CharacterGenerator { + + public Flux generateCharacters() { + + return Flux.generate(() -> 97, (state, sink) -> { + char value = (char) state.intValue(); + sink.next(value); + if (value == 'z') { + sink.complete(); + } + return state + 1; + }); + } +} diff --git a/reactor-core/src/test/java/com/baeldung/reactor/creation/CharacterUnitTest.java b/reactor-core/src/test/java/com/baeldung/reactor/creation/CharacterUnitTest.java new file mode 100644 index 0000000000..2609f1f403 --- /dev/null +++ b/reactor-core/src/test/java/com/baeldung/reactor/creation/CharacterUnitTest.java @@ -0,0 +1,50 @@ +package com.baeldung.reactor.creation; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.util.ArrayList; +import java.util.List; + +import org.junit.Test; + +import reactor.core.publisher.Flux; +import reactor.test.StepVerifier; + +public class CharacterUnitTest { + + @Test + public void whenGeneratingCharacters_thenCharactersAreProduced() { + CharacterGenerator characterGenerator = new CharacterGenerator(); + Flux characterFlux = characterGenerator.generateCharacters().take(3); + + StepVerifier.create(characterFlux) + .expectNext('a', 'b', 'c') + .expectComplete() + .verify(); + } + + @Test + public void whenCreatingCharactersWithMultipleThreads_thenSequenceIsProducedAsynchronously() throws InterruptedException { + CharacterGenerator characterGenerator = new CharacterGenerator(); + List sequence1 = characterGenerator.generateCharacters().take(3).collectList().block(); + List sequence2 = characterGenerator.generateCharacters().take(2).collectList().block(); + + CharacterCreator characterCreator = new CharacterCreator(); + Thread producingThread1 = new Thread( + () -> characterCreator.consumer.accept(sequence1) + ); + Thread producingThread2 = new Thread( + () -> characterCreator.consumer.accept(sequence2) + ); + + List consolidated = new ArrayList<>(); + characterCreator.createCharacterSequence().subscribe(consolidated::add); + + producingThread1.start(); + producingThread2.start(); + producingThread1.join(); + producingThread2.join(); + + assertThat(consolidated).containsExactlyInAnyOrder('a', 'b', 'c', 'a', 'b'); + } +} From a341bb7c65588b56072ea0ac567b812b06397fdd Mon Sep 17 00:00:00 2001 From: Somnath Musib <7885767+musibs@users.noreply.github.com> Date: Sat, 16 Apr 2022 07:29:53 +0530 Subject: [PATCH 2/5] PR Review changes --- .../create}/CharacterCreator.java | 3 +-- .../create}/CharacterGenerator.java | 4 +-- .../create}/CharacterUnitTest.java | 26 ++++++++++++------- 3 files changed, 18 insertions(+), 15 deletions(-) rename reactor-core/src/main/java/com/baeldung/reactor/{creation => generate/create}/CharacterCreator.java (88%) rename reactor-core/src/main/java/com/baeldung/reactor/{creation => generate/create}/CharacterGenerator.java (88%) rename reactor-core/src/test/java/com/baeldung/reactor/{creation => generate/create}/CharacterUnitTest.java (76%) diff --git a/reactor-core/src/main/java/com/baeldung/reactor/creation/CharacterCreator.java b/reactor-core/src/main/java/com/baeldung/reactor/generate/create/CharacterCreator.java similarity index 88% rename from reactor-core/src/main/java/com/baeldung/reactor/creation/CharacterCreator.java rename to reactor-core/src/main/java/com/baeldung/reactor/generate/create/CharacterCreator.java index 382615b01c..122e512e78 100644 --- a/reactor-core/src/main/java/com/baeldung/reactor/creation/CharacterCreator.java +++ b/reactor-core/src/main/java/com/baeldung/reactor/generate/create/CharacterCreator.java @@ -1,4 +1,4 @@ -package com.baeldung.reactor.creation; +package com.baeldung.reactor.generate.create; import java.util.List; import java.util.function.Consumer; @@ -6,7 +6,6 @@ import java.util.function.Consumer; import reactor.core.publisher.Flux; public class CharacterCreator { - public Consumer> consumer; public Flux createCharacterSequence() { diff --git a/reactor-core/src/main/java/com/baeldung/reactor/creation/CharacterGenerator.java b/reactor-core/src/main/java/com/baeldung/reactor/generate/create/CharacterGenerator.java similarity index 88% rename from reactor-core/src/main/java/com/baeldung/reactor/creation/CharacterGenerator.java rename to reactor-core/src/main/java/com/baeldung/reactor/generate/create/CharacterGenerator.java index a22ea02bba..7951483dd9 100644 --- a/reactor-core/src/main/java/com/baeldung/reactor/creation/CharacterGenerator.java +++ b/reactor-core/src/main/java/com/baeldung/reactor/generate/create/CharacterGenerator.java @@ -1,11 +1,9 @@ -package com.baeldung.reactor.creation; +package com.baeldung.reactor.generate.create; import reactor.core.publisher.Flux; public class CharacterGenerator { - public Flux generateCharacters() { - return Flux.generate(() -> 97, (state, sink) -> { char value = (char) state.intValue(); sink.next(value); diff --git a/reactor-core/src/test/java/com/baeldung/reactor/creation/CharacterUnitTest.java b/reactor-core/src/test/java/com/baeldung/reactor/generate/create/CharacterUnitTest.java similarity index 76% rename from reactor-core/src/test/java/com/baeldung/reactor/creation/CharacterUnitTest.java rename to reactor-core/src/test/java/com/baeldung/reactor/generate/create/CharacterUnitTest.java index 2609f1f403..e58ce40b82 100644 --- a/reactor-core/src/test/java/com/baeldung/reactor/creation/CharacterUnitTest.java +++ b/reactor-core/src/test/java/com/baeldung/reactor/generate/create/CharacterUnitTest.java @@ -1,4 +1,4 @@ -package com.baeldung.reactor.creation; +package com.baeldung.reactor.generate.create; import static org.assertj.core.api.Assertions.assertThat; @@ -11,7 +11,6 @@ import reactor.core.publisher.Flux; import reactor.test.StepVerifier; public class CharacterUnitTest { - @Test public void whenGeneratingCharacters_thenCharactersAreProduced() { CharacterGenerator characterGenerator = new CharacterGenerator(); @@ -26,24 +25,31 @@ public class CharacterUnitTest { @Test public void whenCreatingCharactersWithMultipleThreads_thenSequenceIsProducedAsynchronously() throws InterruptedException { CharacterGenerator characterGenerator = new CharacterGenerator(); - List sequence1 = characterGenerator.generateCharacters().take(3).collectList().block(); - List sequence2 = characterGenerator.generateCharacters().take(2).collectList().block(); + List sequence1 = characterGenerator.generateCharacters() + .take(3) + .collectList() + .block(); + List sequence2 = characterGenerator.generateCharacters() + .take(2) + .collectList() + .block(); CharacterCreator characterCreator = new CharacterCreator(); - Thread producingThread1 = new Thread( + + Thread producerThread1 = new Thread( () -> characterCreator.consumer.accept(sequence1) ); - Thread producingThread2 = new Thread( + Thread producerThread2 = new Thread( () -> characterCreator.consumer.accept(sequence2) ); List consolidated = new ArrayList<>(); characterCreator.createCharacterSequence().subscribe(consolidated::add); - producingThread1.start(); - producingThread2.start(); - producingThread1.join(); - producingThread2.join(); + producerThread1.start(); + producerThread2.start(); + producerThread1.join(); + producerThread2.join(); assertThat(consolidated).containsExactlyInAnyOrder('a', 'b', 'c', 'a', 'b'); } From a811cb32cfa295e7ac3fa09a2bb774484360958b Mon Sep 17 00:00:00 2001 From: Somnath Musib <7885767+musibs@users.noreply.github.com> Date: Fri, 22 Apr 2022 19:46:53 +0530 Subject: [PATCH 3/5] Reactor dependency version changes --- reactor-core/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/reactor-core/pom.xml b/reactor-core/pom.xml index 39a66cee3e..e27a1a2845 100644 --- a/reactor-core/pom.xml +++ b/reactor-core/pom.xml @@ -35,7 +35,7 @@ - 3.4.9 + 3.4.17 \ No newline at end of file From 5e1fdd53f0f0d242ee100cb03d2a6f86f0263749 Mon Sep 17 00:00:00 2001 From: Somnath Musib <7885767+musibs@users.noreply.github.com> Date: Sat, 23 Apr 2022 14:35:49 +0530 Subject: [PATCH 4/5] Removed Trailing whiespaces and corrected the indentation --- .../generate/create/CharacterUnitTest.java | 15 +++++---------- 1 file changed, 5 insertions(+), 10 deletions(-) diff --git a/reactor-core/src/test/java/com/baeldung/reactor/generate/create/CharacterUnitTest.java b/reactor-core/src/test/java/com/baeldung/reactor/generate/create/CharacterUnitTest.java index e58ce40b82..93a65f4006 100644 --- a/reactor-core/src/test/java/com/baeldung/reactor/generate/create/CharacterUnitTest.java +++ b/reactor-core/src/test/java/com/baeldung/reactor/generate/create/CharacterUnitTest.java @@ -17,11 +17,10 @@ public class CharacterUnitTest { Flux characterFlux = characterGenerator.generateCharacters().take(3); StepVerifier.create(characterFlux) - .expectNext('a', 'b', 'c') - .expectComplete() - .verify(); + .expectNext('a', 'b', 'c') + .expectComplete() + .verify(); } - @Test public void whenCreatingCharactersWithMultipleThreads_thenSequenceIsProducedAsynchronously() throws InterruptedException { CharacterGenerator characterGenerator = new CharacterGenerator(); @@ -35,14 +34,10 @@ public class CharacterUnitTest { .block(); CharacterCreator characterCreator = new CharacterCreator(); - Thread producerThread1 = new Thread( - () -> characterCreator.consumer.accept(sequence1) - ); + () -> characterCreator.consumer.accept(sequence1)); Thread producerThread2 = new Thread( - () -> characterCreator.consumer.accept(sequence2) - ); - + () -> characterCreator.consumer.accept(sequence2)); List consolidated = new ArrayList<>(); characterCreator.createCharacterSequence().subscribe(consolidated::add); From 3d98763c50cdd129cbb1c06788e2c52c10c934b7 Mon Sep 17 00:00:00 2001 From: Somnath Musib <7885767+musibs@users.noreply.github.com> Date: Mon, 25 Apr 2022 10:14:51 +0530 Subject: [PATCH 5/5] Included space between methods --- .../com/baeldung/reactor/generate/create/CharacterUnitTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/reactor-core/src/test/java/com/baeldung/reactor/generate/create/CharacterUnitTest.java b/reactor-core/src/test/java/com/baeldung/reactor/generate/create/CharacterUnitTest.java index 93a65f4006..6822b25a7f 100644 --- a/reactor-core/src/test/java/com/baeldung/reactor/generate/create/CharacterUnitTest.java +++ b/reactor-core/src/test/java/com/baeldung/reactor/generate/create/CharacterUnitTest.java @@ -21,6 +21,7 @@ public class CharacterUnitTest { .expectComplete() .verify(); } + @Test public void whenCreatingCharactersWithMultipleThreads_thenSequenceIsProducedAsynchronously() throws InterruptedException { CharacterGenerator characterGenerator = new CharacterGenerator();