Merge pull request #12042 from musibs/BAEL-4886

BAEL-4886 Flux generate vs create
This commit is contained in:
Loredana Crusoveanu 2022-06-20 21:21:26 +03:00 committed by GitHub
commit 87c16c874f
4 changed files with 83 additions and 1 deletions

View File

@ -35,7 +35,7 @@
</dependencies>
<properties>
<reactor.version>3.4.9</reactor.version>
<reactor.version>3.4.17</reactor.version>
</properties>
</project>

View File

@ -0,0 +1,14 @@
package com.baeldung.reactor.generate.create;
import java.util.List;
import java.util.function.Consumer;
import reactor.core.publisher.Flux;
public class CharacterCreator {
public Consumer<List<Character>> consumer;
public Flux<Character> createCharacterSequence() {
return Flux.create(sink -> CharacterCreator.this.consumer = items -> items.forEach(sink::next));
}
}

View File

@ -0,0 +1,16 @@
package com.baeldung.reactor.generate.create;
import reactor.core.publisher.Flux;
public class CharacterGenerator {
public Flux<Character> generateCharacters() {
return Flux.generate(() -> 97, (state, sink) -> {
char value = (char) state.intValue();
sink.next(value);
if (value == 'z') {
sink.complete();
}
return state + 1;
});
}
}

View File

@ -0,0 +1,52 @@
package com.baeldung.reactor.generate.create;
import static org.assertj.core.api.Assertions.assertThat;
import java.util.ArrayList;
import java.util.List;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
public class CharacterUnitTest {
@Test
public void whenGeneratingCharacters_thenCharactersAreProduced() {
CharacterGenerator characterGenerator = new CharacterGenerator();
Flux<Character> characterFlux = characterGenerator.generateCharacters().take(3);
StepVerifier.create(characterFlux)
.expectNext('a', 'b', 'c')
.expectComplete()
.verify();
}
@Test
public void whenCreatingCharactersWithMultipleThreads_thenSequenceIsProducedAsynchronously() throws InterruptedException {
CharacterGenerator characterGenerator = new CharacterGenerator();
List<Character> sequence1 = characterGenerator.generateCharacters()
.take(3)
.collectList()
.block();
List<Character> sequence2 = characterGenerator.generateCharacters()
.take(2)
.collectList()
.block();
CharacterCreator characterCreator = new CharacterCreator();
Thread producerThread1 = new Thread(
() -> characterCreator.consumer.accept(sequence1));
Thread producerThread2 = new Thread(
() -> characterCreator.consumer.accept(sequence2));
List<Character> consolidated = new ArrayList<>();
characterCreator.createCharacterSequence().subscribe(consolidated::add);
producerThread1.start();
producerThread2.start();
producerThread1.join();
producerThread2.join();
assertThat(consolidated).containsExactlyInAnyOrder('a', 'b', 'c', 'a', 'b');
}
}