flux generating data on interval

This commit is contained in:
Eugen Paraschiv 2017-12-13 15:56:37 +02:00
parent df8b6bea54
commit c68acf03d9
2 changed files with 34 additions and 6 deletions

View File

@ -8,25 +8,24 @@ import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController; import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono; import reactor.core.publisher.Mono;
@RestController("/foos") @RestController
public class FooReactiveController { public class FooReactiveController {
@GetMapping("/{id}") @GetMapping("/foos/{id}")
public Mono<Foo> getFoo(@PathVariable("id") long id) { public Mono<Foo> getFoo(@PathVariable("id") long id) {
return Mono.just(new Foo(id, randomAlphabetic(6))); return Mono.just(new Foo(id, randomAlphabetic(6)));
} }
@GetMapping("/") @GetMapping("/foos")
public Flux<Foo> getAllFoos() { public Flux<Foo> getAllFoos() {
final ConnectableFlux<Foo> flux = Flux.<Foo> create(fluxSink -> { final Flux<Foo> flux = Flux.<Foo> create(fluxSink -> {
while (true) { while (true) {
fluxSink.next(new Foo(System.currentTimeMillis(), randomAlphabetic(6))); fluxSink.next(new Foo(System.currentTimeMillis(), randomAlphabetic(6)));
} }
}).sample(Duration.ofSeconds(1)).publish(); }).sample(Duration.ofSeconds(1)).log();
return flux; return flux;
} }

View File

@ -0,0 +1,29 @@
package com.baeldung.reactive;
import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic;
import static org.junit.Assert.assertNotNull;
import java.time.Duration;
import org.junit.jupiter.api.Test;
import com.baeldung.reactive.controller.Foo;
import reactor.core.publisher.Flux;
public class FluxUnitTest {
@Test
public void whenFluxIsConstructed_thenCorrect() {
final Flux<Foo> flux = Flux.<Foo> create(fluxSink -> {
while (true) {
fluxSink.next(new Foo(System.currentTimeMillis(), randomAlphabetic(6)));
}
}).sample(Duration.ofSeconds(1)).log();
flux.subscribe();
assertNotNull(flux);
}
}