BAEL-4886 Flux generate vs create

This commit is contained in:
Somnath Musib 2022-04-10 08:12:09 +05:30
parent 69e04d5ac0
commit 859e4bac77
3 changed files with 83 additions and 0 deletions

View File

@ -0,0 +1,15 @@
package com.baeldung.reactor.creation;
import java.util.List;
import java.util.function.Consumer;
import reactor.core.publisher.Flux;
public class CharacterCreator {
public Consumer<List<Character>> consumer;
public Flux<Character> createCharacterSequence() {
return Flux.create(sink -> CharacterCreator.this.consumer = items -> items.forEach(sink::next));
}
}

View File

@ -0,0 +1,18 @@
package com.baeldung.reactor.creation;
import reactor.core.publisher.Flux;
public class CharacterGenerator {
public Flux<Character> generateCharacters() {
return Flux.generate(() -> 97, (state, sink) -> {
char value = (char) state.intValue();
sink.next(value);
if (value == 'z') {
sink.complete();
}
return state + 1;
});
}
}

View File

@ -0,0 +1,50 @@
package com.baeldung.reactor.creation;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
public class CharacterUnitTest {
@Test
public void whenGeneratingCharacters_thenCharactersAreProduced() {
CharacterGenerator characterGenerator = new CharacterGenerator();
Flux<Character> characterFlux = characterGenerator.generateCharacters().take(3);
StepVerifier.create(characterFlux)
.expectNext('a', 'b', 'c')
.expectComplete()
.verify();
}
@Test
public void whenCreatingCharactersWithMultipleThreads_thenSequenceIsProducedAsynchronously() throws InterruptedException {
CharacterGenerator characterGenerator = new CharacterGenerator();
List<Character> sequence1 = characterGenerator.generateCharacters().take(3).collectList().block();
List<Character> sequence2 = characterGenerator.generateCharacters().take(2).collectList().block();
CharacterCreator characterCreator = new CharacterCreator();
Thread producingThread1 = new Thread(
() -> characterCreator.consumer.accept(sequence1)
);
Thread producingThread2 = new Thread(
() -> characterCreator.consumer.accept(sequence2)
);
List<Character> consolidated = new ArrayList<>();
characterCreator.createCharacterSequence().subscribe(consolidated::add);
producingThread1.start();
producingThread2.start();
producingThread1.join();
producingThread2.join();
assertThat(consolidated).containsExactlyInAnyOrder('a', 'b', 'c', 'a', 'b');
}
}