A short example of real-time event streaming using Spring Webflux

This commit is contained in:
Krzysztof Majewski 2018-07-04 12:32:22 +02:00
parent 684324fc10
commit 487557faf2
6 changed files with 142 additions and 0 deletions

View File

@ -0,0 +1,31 @@
package com.baeldung.reactive.client;
import com.baeldung.reactive.model.CpuUsageEvent;
import org.springframework.boot.CommandLineRunner;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.stereotype.Component;
import org.springframework.web.reactive.function.client.WebClient;
/**
* @author Krzysztof Majewski
*/
@Component
public class CpuUsageEventConsumer {
@Bean
WebClient client() {
return WebClient.create("http://localhost:8080");
}
@Bean
CommandLineRunner eventsConsumer(WebClient client) {
return args -> client.get()
.uri("/events/stream")
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.flatMapMany(cr -> cr.bodyToFlux(CpuUsageEvent.class))
.subscribe(System.out::println);
}
}

View File

@ -0,0 +1,24 @@
package com.baeldung.reactive.controller;
import com.baeldung.reactive.model.CpuUsageEvent;
import com.baeldung.reactive.utils.CpuUtils;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.stream.Stream;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import reactor.core.publisher.Flux;
import reactor.util.function.Tuple2;
@RestController
public class CpuUsageEventProducer {
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/events/stream")
Flux<CpuUsageEvent> events() {
Flux<CpuUsageEvent> eventFlux = Flux
.fromStream(Stream.generate(() -> new CpuUsageEvent(CpuUtils.getUsage(), LocalDateTime.now())));
Flux<Long> durationFlux = Flux.interval(Duration.ofSeconds(5));
return Flux.zip(eventFlux, durationFlux).map(Tuple2::getT1);
}
}

View File

@ -0,0 +1,17 @@
package com.baeldung.reactive.model;
import java.time.LocalDateTime;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.ToString;
@Data
@ToString
@AllArgsConstructor
public class CpuUsageEvent {
private Double cpuUsage;
private LocalDateTime time;
}

View File

@ -0,0 +1,14 @@
package com.baeldung.reactive.utils;
import com.sun.management.OperatingSystemMXBean;
import java.lang.management.ManagementFactory;
public class CpuUtils {
private static OperatingSystemMXBean operatingSystemMXBean = (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean();
public static Double getUsage() {
return (operatingSystemMXBean.getSystemCpuLoad() / operatingSystemMXBean.getAvailableProcessors()) * 100;
}
}

View File

@ -0,0 +1,32 @@
package com.baeldung.reactive.controller;
import com.baeldung.reactive.Spring5ReactiveApplication;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.http.MediaType;
import org.springframework.test.context.junit4.SpringRunner;
import org.springframework.test.web.reactive.server.WebTestClient;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = Spring5ReactiveApplication.class)
public class CpuUsageEventProducerIntegrationTest {
@Autowired
private WebTestClient webTestClient;
@Test
public void whenGetCpuUsageEvents_thenReturns200() {
webTestClient.get()
.uri("/events/stream")
.accept(MediaType.TEXT_EVENT_STREAM)
.exchange()
.expectStatus().isOk()
.expectHeader().contentType(MediaType.APPLICATION_JSON_UTF8)
.expectBody()
.jsonPath("$.cpuUsage").isNotEmpty()
.jsonPath("$.time").isNotEmpty();
}
}

View File

@ -0,0 +1,24 @@
package com.baeldung.reactive.utils;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertThat;
import com.baeldung.reactive.Spring5ReactiveApplication;
import org.hamcrest.Matchers;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class)
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = Spring5ReactiveApplication.class)
public class CpuUtilsTest {
@Test
public void whenGetUsage_returnCorrectValue() {
Double usage = CpuUtils.getUsage();
assertNotNull(usage);
assertThat(usage, Matchers.lessThanOrEqualTo(100d));
}
}