PR Review changes
This commit is contained in:
parent
859e4bac77
commit
a341bb7c65
@ -1,4 +1,4 @@
|
|||||||
package com.baeldung.reactor.creation;
|
package com.baeldung.reactor.generate.create;
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.function.Consumer;
|
import java.util.function.Consumer;
|
||||||
@ -6,7 +6,6 @@ import java.util.function.Consumer;
|
|||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
public class CharacterCreator {
|
public class CharacterCreator {
|
||||||
|
|
||||||
public Consumer<List<Character>> consumer;
|
public Consumer<List<Character>> consumer;
|
||||||
|
|
||||||
public Flux<Character> createCharacterSequence() {
|
public Flux<Character> createCharacterSequence() {
|
@ -1,11 +1,9 @@
|
|||||||
package com.baeldung.reactor.creation;
|
package com.baeldung.reactor.generate.create;
|
||||||
|
|
||||||
import reactor.core.publisher.Flux;
|
import reactor.core.publisher.Flux;
|
||||||
|
|
||||||
public class CharacterGenerator {
|
public class CharacterGenerator {
|
||||||
|
|
||||||
public Flux<Character> generateCharacters() {
|
public Flux<Character> generateCharacters() {
|
||||||
|
|
||||||
return Flux.generate(() -> 97, (state, sink) -> {
|
return Flux.generate(() -> 97, (state, sink) -> {
|
||||||
char value = (char) state.intValue();
|
char value = (char) state.intValue();
|
||||||
sink.next(value);
|
sink.next(value);
|
@ -1,4 +1,4 @@
|
|||||||
package com.baeldung.reactor.creation;
|
package com.baeldung.reactor.generate.create;
|
||||||
|
|
||||||
import static org.assertj.core.api.Assertions.assertThat;
|
import static org.assertj.core.api.Assertions.assertThat;
|
||||||
|
|
||||||
@ -11,7 +11,6 @@ import reactor.core.publisher.Flux;
|
|||||||
import reactor.test.StepVerifier;
|
import reactor.test.StepVerifier;
|
||||||
|
|
||||||
public class CharacterUnitTest {
|
public class CharacterUnitTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void whenGeneratingCharacters_thenCharactersAreProduced() {
|
public void whenGeneratingCharacters_thenCharactersAreProduced() {
|
||||||
CharacterGenerator characterGenerator = new CharacterGenerator();
|
CharacterGenerator characterGenerator = new CharacterGenerator();
|
||||||
@ -26,24 +25,31 @@ public class CharacterUnitTest {
|
|||||||
@Test
|
@Test
|
||||||
public void whenCreatingCharactersWithMultipleThreads_thenSequenceIsProducedAsynchronously() throws InterruptedException {
|
public void whenCreatingCharactersWithMultipleThreads_thenSequenceIsProducedAsynchronously() throws InterruptedException {
|
||||||
CharacterGenerator characterGenerator = new CharacterGenerator();
|
CharacterGenerator characterGenerator = new CharacterGenerator();
|
||||||
List<Character> sequence1 = characterGenerator.generateCharacters().take(3).collectList().block();
|
List<Character> sequence1 = characterGenerator.generateCharacters()
|
||||||
List<Character> sequence2 = characterGenerator.generateCharacters().take(2).collectList().block();
|
.take(3)
|
||||||
|
.collectList()
|
||||||
|
.block();
|
||||||
|
List<Character> sequence2 = characterGenerator.generateCharacters()
|
||||||
|
.take(2)
|
||||||
|
.collectList()
|
||||||
|
.block();
|
||||||
|
|
||||||
CharacterCreator characterCreator = new CharacterCreator();
|
CharacterCreator characterCreator = new CharacterCreator();
|
||||||
Thread producingThread1 = new Thread(
|
|
||||||
|
Thread producerThread1 = new Thread(
|
||||||
() -> characterCreator.consumer.accept(sequence1)
|
() -> characterCreator.consumer.accept(sequence1)
|
||||||
);
|
);
|
||||||
Thread producingThread2 = new Thread(
|
Thread producerThread2 = new Thread(
|
||||||
() -> characterCreator.consumer.accept(sequence2)
|
() -> characterCreator.consumer.accept(sequence2)
|
||||||
);
|
);
|
||||||
|
|
||||||
List<Character> consolidated = new ArrayList<>();
|
List<Character> consolidated = new ArrayList<>();
|
||||||
characterCreator.createCharacterSequence().subscribe(consolidated::add);
|
characterCreator.createCharacterSequence().subscribe(consolidated::add);
|
||||||
|
|
||||||
producingThread1.start();
|
producerThread1.start();
|
||||||
producingThread2.start();
|
producerThread2.start();
|
||||||
producingThread1.join();
|
producerThread1.join();
|
||||||
producingThread2.join();
|
producerThread2.join();
|
||||||
|
|
||||||
assertThat(consolidated).containsExactlyInAnyOrder('a', 'b', 'c', 'a', 'b');
|
assertThat(consolidated).containsExactlyInAnyOrder('a', 'b', 'c', 'a', 'b');
|
||||||
}
|
}
|
Loading…
x
Reference in New Issue
Block a user