From 487557faf23383906909830070dc2c71cd32f628 Mon Sep 17 00:00:00 2001 From: Krzysztof Majewski Date: Wed, 4 Jul 2018 12:32:22 +0200 Subject: [PATCH] A short example of real-time event streaming using Spring Webflux --- .../client/CpuUsageEventConsumer.java | 31 ++++++++++++++++++ .../controller/CpuUsageEventProducer.java | 24 ++++++++++++++ .../reactive/model/CpuUsageEvent.java | 17 ++++++++++ .../com/baeldung/reactive/utils/CpuUtils.java | 14 ++++++++ .../CpuUsageEventProducerIntegrationTest.java | 32 +++++++++++++++++++ .../baeldung/reactive/utils/CpuUtilsTest.java | 24 ++++++++++++++ 6 files changed, 142 insertions(+) create mode 100644 spring-5-reactive/src/main/java/com/baeldung/reactive/client/CpuUsageEventConsumer.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/reactive/controller/CpuUsageEventProducer.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/reactive/model/CpuUsageEvent.java create mode 100644 spring-5-reactive/src/main/java/com/baeldung/reactive/utils/CpuUtils.java create mode 100644 spring-5-reactive/src/test/java/com/baeldung/reactive/controller/CpuUsageEventProducerIntegrationTest.java create mode 100644 spring-5-reactive/src/test/java/com/baeldung/reactive/utils/CpuUtilsTest.java diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/client/CpuUsageEventConsumer.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/client/CpuUsageEventConsumer.java new file mode 100644 index 0000000000..3b9fbf7838 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/client/CpuUsageEventConsumer.java @@ -0,0 +1,31 @@ +package com.baeldung.reactive.client; + +import com.baeldung.reactive.model.CpuUsageEvent; +import org.springframework.boot.CommandLineRunner; +import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; + +/** + * @author Krzysztof Majewski + */ +@Component +public class CpuUsageEventConsumer { + + @Bean + WebClient client() { + return WebClient.create("http://localhost:8080"); + } + + @Bean + CommandLineRunner eventsConsumer(WebClient client) { + return args -> client.get() + .uri("/events/stream") + .accept(MediaType.TEXT_EVENT_STREAM) + .exchange() + .flatMapMany(cr -> cr.bodyToFlux(CpuUsageEvent.class)) + .subscribe(System.out::println); + } + +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/controller/CpuUsageEventProducer.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/controller/CpuUsageEventProducer.java new file mode 100644 index 0000000000..5271debb84 --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/controller/CpuUsageEventProducer.java @@ -0,0 +1,24 @@ +package com.baeldung.reactive.controller; + +import com.baeldung.reactive.model.CpuUsageEvent; +import com.baeldung.reactive.utils.CpuUtils; +import java.time.Duration; +import java.time.LocalDateTime; +import java.util.stream.Stream; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Flux; +import reactor.util.function.Tuple2; + +@RestController +public class CpuUsageEventProducer { + + @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/events/stream") + Flux events() { + Flux eventFlux = Flux + .fromStream(Stream.generate(() -> new CpuUsageEvent(CpuUtils.getUsage(), LocalDateTime.now()))); + Flux durationFlux = Flux.interval(Duration.ofSeconds(5)); + return Flux.zip(eventFlux, durationFlux).map(Tuple2::getT1); + } +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/model/CpuUsageEvent.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/model/CpuUsageEvent.java new file mode 100644 index 0000000000..1ad41be65c --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/model/CpuUsageEvent.java @@ -0,0 +1,17 @@ +package com.baeldung.reactive.model; + +import java.time.LocalDateTime; +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.ToString; + +@Data +@ToString +@AllArgsConstructor +public class CpuUsageEvent { + + private Double cpuUsage; + + private LocalDateTime time; + +} diff --git a/spring-5-reactive/src/main/java/com/baeldung/reactive/utils/CpuUtils.java b/spring-5-reactive/src/main/java/com/baeldung/reactive/utils/CpuUtils.java new file mode 100644 index 0000000000..c70aad9d2e --- /dev/null +++ b/spring-5-reactive/src/main/java/com/baeldung/reactive/utils/CpuUtils.java @@ -0,0 +1,14 @@ +package com.baeldung.reactive.utils; + +import com.sun.management.OperatingSystemMXBean; +import java.lang.management.ManagementFactory; + +public class CpuUtils { + + private static OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); + + public static Double getUsage() { + return (operatingSystemMXBean.getSystemCpuLoad() / operatingSystemMXBean.getAvailableProcessors()) * 100; + } + +} diff --git a/spring-5-reactive/src/test/java/com/baeldung/reactive/controller/CpuUsageEventProducerIntegrationTest.java b/spring-5-reactive/src/test/java/com/baeldung/reactive/controller/CpuUsageEventProducerIntegrationTest.java new file mode 100644 index 0000000000..81897cd8d6 --- /dev/null +++ b/spring-5-reactive/src/test/java/com/baeldung/reactive/controller/CpuUsageEventProducerIntegrationTest.java @@ -0,0 +1,32 @@ +package com.baeldung.reactive.controller; + +import com.baeldung.reactive.Spring5ReactiveApplication; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.http.MediaType; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.web.reactive.server.WebTestClient; + +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = Spring5ReactiveApplication.class) +public class CpuUsageEventProducerIntegrationTest { + + @Autowired + private WebTestClient webTestClient; + + @Test + public void whenGetCpuUsageEvents_thenReturns200() { + webTestClient.get() + .uri("/events/stream") + .accept(MediaType.TEXT_EVENT_STREAM) + .exchange() + .expectStatus().isOk() + .expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8) + .expectBody() + .jsonPath("$.cpuUsage").isNotEmpty() + .jsonPath("$.time").isNotEmpty(); + } + +} diff --git a/spring-5-reactive/src/test/java/com/baeldung/reactive/utils/CpuUtilsTest.java b/spring-5-reactive/src/test/java/com/baeldung/reactive/utils/CpuUtilsTest.java new file mode 100644 index 0000000000..a36f9c8528 --- /dev/null +++ b/spring-5-reactive/src/test/java/com/baeldung/reactive/utils/CpuUtilsTest.java @@ -0,0 +1,24 @@ +package com.baeldung.reactive.utils; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertThat; + +import com.baeldung.reactive.Spring5ReactiveApplication; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.test.context.junit4.SpringRunner; + +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = Spring5ReactiveApplication.class) +public class CpuUtilsTest { + + @Test + public void whenGetUsage_returnCorrectValue() { + Double usage = CpuUtils.getUsage(); + assertNotNull(usage); + assertThat(usage, Matchers.lessThanOrEqualTo(100d)); + } + +}