Renamed tests and added connectableFlux test
This commit is contained in:
parent
6bbb4cfef4
commit
13b639cdff
|
@ -24,29 +24,36 @@
|
|||
<dependency>
|
||||
<groupId>io.projectreactor</groupId>
|
||||
<artifactId>reactor-core</artifactId>
|
||||
<version>3.0.4.RELEASE</version>
|
||||
<version>${reactor-core.version}</version>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>junit</groupId>
|
||||
<artifactId>junit</artifactId>
|
||||
<version>4.12</version>
|
||||
<version>${junit.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>org.assertj</groupId>
|
||||
<artifactId>assertj-core</artifactId>
|
||||
<version>3.6.1</version>
|
||||
<version>${assertj.version}</version>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
|
||||
<dependency>
|
||||
<groupId>ch.qos.logback</groupId>
|
||||
<artifactId>logback-classic</artifactId>
|
||||
<version>1.1.3</version>
|
||||
<version>${logback.version}</version>
|
||||
</dependency>
|
||||
|
||||
</dependencies>
|
||||
|
||||
<properties>
|
||||
<reactor-core.version>3.0.4.RELEASE</reactor-core.version>
|
||||
<junit.version>4.12</junit.version>
|
||||
<assertj.version>3.6.1</assertj.version>
|
||||
<logback.version>1.1.3</logback.version>
|
||||
</properties>
|
||||
|
||||
</project>
|
||||
|
|
|
@ -5,36 +5,33 @@ import org.reactivestreams.Subscriber;
|
|||
import org.reactivestreams.Subscription;
|
||||
import reactor.core.publisher.ConnectableFlux;
|
||||
import reactor.core.publisher.Flux;
|
||||
import reactor.core.publisher.FluxSink;
|
||||
import reactor.core.scheduler.Schedulers;
|
||||
|
||||
import java.time.Duration;
|
||||
import java.time.LocalDateTime;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import static java.time.Duration.ofSeconds;
|
||||
import static java.util.stream.Collectors.toList;
|
||||
import static org.assertj.core.api.Assertions.assertThat;
|
||||
|
||||
public class ReactorTest {
|
||||
|
||||
@Test
|
||||
public void givenFlux_whenSubscribing_shouldStream() throws InterruptedException {
|
||||
public void givenFlux_whenSubscribing_thenStream() throws InterruptedException {
|
||||
|
||||
List<Integer> elements = new ArrayList<>();
|
||||
|
||||
Flux.just(1, 2, 3, 4)
|
||||
.log()
|
||||
.map(i -> i * 2)
|
||||
.map(i -> {
|
||||
System.out.println(i + ":" + Thread.currentThread());
|
||||
return i * 2;
|
||||
})
|
||||
.subscribe(elements::add);
|
||||
|
||||
assertThat(elements).containsExactly(2, 4, 6, 8);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenFlux_whenZipping_shouldCombine() {
|
||||
public void givenFlux_whenZipping_thenCombine() {
|
||||
List<String> elements = new ArrayList<>();
|
||||
|
||||
Flux.just(1, 2, 3, 4)
|
||||
|
@ -51,7 +48,7 @@ public class ReactorTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void givenFlux_whenApplyingBackPressure_shouldPushLessElements() throws InterruptedException {
|
||||
public void givenFlux_whenApplyingBackPressure_thenPushElementsInBatches() throws InterruptedException {
|
||||
|
||||
List<Integer> elements = new ArrayList<>();
|
||||
|
||||
|
@ -92,18 +89,34 @@ public class ReactorTest {
|
|||
}
|
||||
|
||||
@Test
|
||||
public void givenFlux_whenInParalle_shouldSubscribeInDifferentThreads() throws InterruptedException {
|
||||
List<Integer> elements = new ArrayList<>();
|
||||
public void givenFlux_whenInParallel_thenSubscribeInDifferentThreads() throws InterruptedException {
|
||||
List<String> threadNames = new ArrayList<>();
|
||||
|
||||
Flux.just(1, 2, 3, 4)
|
||||
.log()
|
||||
.map(i -> i * 2)
|
||||
.map(i -> Thread.currentThread().getName())
|
||||
.subscribeOn(Schedulers.parallel())
|
||||
.subscribe(elements::add);
|
||||
.subscribe(threadNames::add);
|
||||
|
||||
Thread.sleep(1000);
|
||||
|
||||
assertThat(elements).containsExactly(2, 4, 6, 8);
|
||||
assertThat(threadNames).containsExactly("parallel-1", "parallel-1", "parallel-1", "parallel-1");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void givenConnectableFlux_thenShouldStream_onConnect() {
|
||||
|
||||
List<Integer> elements = new ArrayList<>();
|
||||
|
||||
final ConnectableFlux<Integer> publish = Flux.just(1, 2, 3, 4).publish();
|
||||
|
||||
publish.subscribe(elements::add);
|
||||
|
||||
assertThat(elements).isEmpty();
|
||||
|
||||
publish.connect();
|
||||
|
||||
assertThat(elements).containsExactly(1, 2, 3, 4);
|
||||
}
|
||||
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue