From c68acf03d9dd803c4f027fffa71574f7cb6d3906 Mon Sep 17 00:00:00 2001 From: Eugen Paraschiv Date: Wed, 13 Dec 2017 15:56:37 +0200 Subject: [PATCH] flux generating data on interval --- .../controller/FooReactiveController.java | 11 ++++--- .../com/baeldung/reactive/FluxUnitTest.java | 29 +++++++++++++++++++ 2 files changed, 34 insertions(+), 6 deletions(-) create mode 100644 spring-5-reactive/src/test/java/com/baeldung/reactive/FluxUnitTest.java diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/controller/FooReactiveController.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/controller/FooReactiveController.java index d82692619d..1115036ad3 100644 --- a/spring-5-reactive/src/main/java/com/baeldung/reactive/controller/FooReactiveController.java +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/controller/FooReactiveController.java @@ -8,25 +8,24 @@ import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RestController; -import reactor.core.publisher.ConnectableFlux; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -@RestController("/foos") +@RestController public class FooReactiveController { - @GetMapping("/{id}") + @GetMapping("/foos/{id}") public Mono getFoo(@PathVariable("id") long id) { return Mono.just(new Foo(id, randomAlphabetic(6))); } - @GetMapping("/") + @GetMapping("/foos") public Flux getAllFoos() { - final ConnectableFlux flux = Flux. create(fluxSink -> { + final Flux flux = Flux. create(fluxSink -> { while (true) { fluxSink.next(new Foo(System.currentTimeMillis(), randomAlphabetic(6))); } - }).sample(Duration.ofSeconds(1)).publish(); + }).sample(Duration.ofSeconds(1)).log(); return flux; } diff --git a/spring-5-reactive/src/test/java/com/baeldung/reactive/FluxUnitTest.java b/spring-5-reactive/src/test/java/com/baeldung/reactive/FluxUnitTest.java new file mode 100644 index 0000000000..5499e72877 --- /dev/null +++ b/spring-5-reactive/src/test/java/com/baeldung/reactive/FluxUnitTest.java @@ -0,0 +1,29 @@ +package com.baeldung.reactive; + +import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; +import static org.junit.Assert.assertNotNull; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; + +import com.baeldung.reactive.controller.Foo; + +import reactor.core.publisher.Flux; + +public class FluxUnitTest { + + @Test + public void whenFluxIsConstructed_thenCorrect() { + final Flux flux = Flux. create(fluxSink -> { + while (true) { + fluxSink.next(new Foo(System.currentTimeMillis(), randomAlphabetic(6))); + } + }).sample(Duration.ofSeconds(1)).log(); + + flux.subscribe(); + + assertNotNull(flux); + } + +}