Used log() in StockClient and formatted in eclipse

This commit is contained in:
priyeshmashelkar 2018-05-24 13:06:02 +01:00
parent d5177c158e
commit c170f2111d
2 changed files with 17 additions and 21 deletions

View File

@ -6,19 +6,17 @@ import org.springframework.web.reactive.function.client.WebClient.RequestHeaders
import com.baeldung.reactive.model.Stock; import com.baeldung.reactive.model.Stock;
import reactor.core.publisher.Flux;
public class StockClient { public class StockClient {
public void getStockUpdates(String stockCode) { public Flux<Stock> getStockUpdates(String stockCode) {
WebClient client = WebClient.create("localhost:9111"); WebClient client = WebClient.create("localhost:8080");
RequestHeadersSpec<?> request = client.get().uri("/rtes/stocks/"+stockCode).accept(MediaType.TEXT_EVENT_STREAM); RequestHeadersSpec<?> request = client.get()
request.retrieve().bodyToFlux(Stock.class).toStream().forEach(System.out::println); .uri("/rtes/stocks/" + stockCode)
} .accept(MediaType.TEXT_EVENT_STREAM);
return request.retrieve()
public static void main(String[] args) throws InterruptedException { .bodyToFlux(Stock.class)
new StockClient().getStockUpdates("GOOGL"); .log();
}
while(true) {
Thread.sleep(1000L);
}
}
} }

View File

@ -23,16 +23,14 @@ public class StockReactiveController {
@GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/stocks/{code}") @GetMapping(produces = MediaType.TEXT_EVENT_STREAM_VALUE, value = "/stocks/{code}")
public Flux<Stock> getStocks(@PathVariable String code) { public Flux<Stock> getStocks(@PathVariable String code) {
BigDecimal startingPrice = new BigDecimal("100"); BigDecimal startingPrice = new BigDecimal("100");
Flux<Stock> stockFlux = Flux.fromStream(Stream.generate(() -> new Stock( Flux<Stock> stockFlux = Flux.fromStream(Stream.generate(() -> new Stock(code, getLatestPrice(startingPrice))));
code,
new Random().nextBoolean() ?
startingPrice.add(BigDecimal.valueOf(new Random().nextDouble())):
startingPrice.subtract(BigDecimal.valueOf(new Random().nextDouble())))));
Flux<Long> emmitFlux = Flux.interval(Duration.ofSeconds(1)); Flux<Long> emmitFlux = Flux.interval(Duration.ofSeconds(1));
return Flux.zip(stockFlux, emmitFlux).map(Tuple2::getT1); return Flux.zip(stockFlux, emmitFlux)
.map(Tuple2::getT1);
} }
public static void main(String [] args) { private BigDecimal getLatestPrice(BigDecimal startingPrice) {
BigDecimal priceChange = BigDecimal.valueOf(new Random().nextDouble());
return new Random().nextBoolean() ? startingPrice.add(priceChange) : startingPrice.subtract(priceChange);
} }
} }