* Fix a division method mistake

* BAEL-2280 Programmatically Creating Sequences with Project Reactor

* BAEL-2280 Update

* BAEL-2280 Update
This commit is contained in:
nguyennamthai 2019-03-13 23:06:11 +07:00 committed by Josh Cummings
parent 47e617fc39
commit 895aad42f9
6 changed files with 163 additions and 8 deletions

View File

@ -16,7 +16,13 @@
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
<version>${reactor-core.version}</version>
<version>${reactor.version}</version>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>${reactor.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
@ -24,16 +30,10 @@
<version>${assertj.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>${reactor-core.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<properties>
<reactor-core.version>3.1.3.RELEASE</reactor-core.version>
<reactor.version>3.2.6.RELEASE</reactor.version>
<assertj.version>3.6.1</assertj.version>
</properties>

View File

@ -0,0 +1,27 @@
package com.baeldung.reactor.creation;
public class FibonacciState {
private int former;
private int latter;
public FibonacciState(int former, int latter) {
this.former = former;
this.latter = latter;
}
public int getFormer() {
return former;
}
public void setFormer(int former) {
this.former = former;
}
public int getLatter() {
return latter;
}
public void setLatter(int latter) {
this.latter = latter;
}
}

View File

@ -0,0 +1,14 @@
package com.baeldung.reactor.creation;
import reactor.core.publisher.Flux;
import java.util.List;
import java.util.function.Consumer;
public class SequenceCreator {
public Consumer<List<Integer>> consumer;
public Flux<Integer> createNumberSequence() {
return Flux.create(sink -> SequenceCreator.this.consumer = items -> items.forEach(sink::next));
}
}

View File

@ -0,0 +1,31 @@
package com.baeldung.reactor.creation;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuples;
public class SequenceGenerator {
public Flux<Integer> generateFibonacciWithTuples() {
return Flux.generate(
() -> Tuples.of(0, 1),
(state, sink) -> {
sink.next(state.getT1());
return Tuples.of(state.getT2(), state.getT1() + state.getT2());
}
);
}
public Flux<Integer> generateFibonacciWithCustomClass(int limit) {
return Flux.generate(
() -> new FibonacciState(0, 1),
(state, sink) -> {
sink.next(state.getFormer());
if (state.getLatter() > limit) {
sink.complete();
}
int temp = state.getFormer();
state.setFormer(state.getLatter());
state.setLatter(temp + state.getLatter());
return state;
});
}
}

View File

@ -0,0 +1,13 @@
package com.baeldung.reactor.creation;
import reactor.core.publisher.Flux;
public class SequenceHandler {
public Flux<Integer> handleIntegerSequence(Flux<Integer> sequence) {
return sequence.handle((number, sink) -> {
if (number % 2 == 0) {
sink.next(number / 2);
}
});
}
}

View File

@ -0,0 +1,70 @@
package com.baeldung.reactor.creation;
import org.junit.Test;
import reactor.core.publisher.Flux;
import reactor.test.StepVerifier;
import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public class SequenceUnitTest {
@Test
public void whenGeneratingNumbersWithTuplesState_thenFibonacciSequenceIsProduced() {
SequenceGenerator sequenceGenerator = new SequenceGenerator();
Flux<Integer> fibonacciFlux = sequenceGenerator.generateFibonacciWithTuples().take(5);
StepVerifier.create(fibonacciFlux)
.expectNext(0, 1, 1, 2, 3)
.expectComplete()
.verify();
}
@Test
public void whenGeneratingNumbersWithCustomClass_thenFibonacciSequenceIsProduced() {
SequenceGenerator sequenceGenerator = new SequenceGenerator();
StepVerifier.create(sequenceGenerator.generateFibonacciWithCustomClass(10))
.expectNext(0, 1, 1, 2, 3, 5, 8)
.expectComplete()
.verify();
}
@Test
public void whenCreatingNumbers_thenSequenceIsProducedAsynchronously() throws InterruptedException {
SequenceGenerator sequenceGenerator = new SequenceGenerator();
List<Integer> sequence1 = sequenceGenerator.generateFibonacciWithTuples().take(3).collectList().block();
List<Integer> sequence2 = sequenceGenerator.generateFibonacciWithTuples().take(4).collectList().block();
SequenceCreator sequenceCreator = new SequenceCreator();
Thread producingThread1 = new Thread(
() -> sequenceCreator.consumer.accept(sequence1)
);
Thread producingThread2 = new Thread(
() -> sequenceCreator.consumer.accept(sequence2)
);
List<Integer> consolidated = new ArrayList<>();
sequenceCreator.createNumberSequence().subscribe(consolidated::add);
producingThread1.start();
producingThread2.start();
producingThread1.join();
producingThread2.join();
assertThat(consolidated).containsExactlyInAnyOrder(0, 1, 1, 0, 1, 1, 2);
}
@Test
public void whenHandlingNumbers_thenSequenceIsMappedAndFiltered() {
SequenceHandler sequenceHandler = new SequenceHandler();
SequenceGenerator sequenceGenerator = new SequenceGenerator();
Flux<Integer> sequence = sequenceGenerator.generateFibonacciWithTuples().take(10);
StepVerifier.create(sequenceHandler.handleIntegerSequence(sequence))
.expectNext(0, 1, 4, 17)
.expectComplete()
.verify();
}
}