Bean Object, server side and client side example for event streaming

example
This commit is contained in:
tritty 2018-05-09 03:58:04 +05:30 committed by Tritty
parent f60debdcd3
commit d1b8f9b9b3
3 changed files with 86 additions and 0 deletions

View File

@ -0,0 +1,26 @@
package com.baeldung.reactive.controller;
import java.time.Duration;
import java.util.Date;
import java.util.Random;
import java.util.stream.Stream;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import com.baeldung.reactive.model.Stock;
import reactor.core.publisher.Flux;
@RestController
public class StockBrokerController {
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/getStockPrice")
public Flux<Object> getStockUpdates() {
final Flux<Stock> stockFlux = Flux.fromStream(Stream.generate(() -> new Stock(new Random().nextFloat(), "WebFluxStock", new Date())));
final Flux<Long> intervalFlux = Flux.interval(Duration.ofSeconds(1));
return Flux.zip(stockFlux, intervalFlux)
.map(t1 -> t1.getT1());
}
}

View File

@ -0,0 +1,18 @@
package com.baeldung.reactive.model;
import java.util.Date;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
@AllArgsConstructor
@Data
@NoArgsConstructor
public class Stock {
private float price;
private String name;
private Date date;
}

View File

@ -0,0 +1,42 @@
package com.baeldung.reactive.webclient;
import java.util.Collections;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.builder.SpringApplicationBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.http.MediaType;
import org.springframework.web.reactive.function.client.WebClient;
import com.baeldung.reactive.model.Stock;
@SpringBootApplication
public class StockClient {
@Bean
WebClient webClient() {
return WebClient.builder()
.baseUrl("http://localhost:8080/getStockPrice")
.build();
}
@Bean
CommandLineRunner runner(WebClient webClient) {
return args -> {
webClient.get()
.accept(MediaType.TEXT_EVENT_STREAM)
.retrieve()
.bodyToFlux(Stock.class)
.log()
.subscribe(System.out::println);
};
}
public static void main(String args[]) throws InterruptedException {
new SpringApplicationBuilder(StockClient.class).properties(Collections.singletonMap("server.port", "9090"))
.run(args);
}
}