From 47abbf654b5bee31721b9d40d0115471b114142d Mon Sep 17 00:00:00 2001 From: psevestre Date: Thu, 14 Oct 2021 01:04:19 -0300 Subject: [PATCH] BAEL-772 Reactive Streams API with Ratpack (#11328) * [BAEL-4849] Article code * [BAEL-4968] Article code * [BAEL-4968] Article code * [BAEL-4968] Article code * [BAEL-4968] Remove extra comments * [BAEL-4020] Article code * [BAEL-722] Article code --- .../main/java/com/baeldung/model/Quote.java | 107 +++++++++ .../rxjava/service/QuotesService.java | 44 ++++ .../spring/EmbedRatpackStreamsApp.java | 206 ++++++++++++++++++ .../baeldung/ratpack/CompliantPublisher.java | 63 ++++++ .../baeldung/ratpack/LoggingSubscriber.java | 67 ++++++ .../ratpack/NonCompliantPublisher.java | 46 ++++ .../ratpack/RatpackStreamsUnitTest.java | 140 ++++++++++++ 7 files changed, 673 insertions(+) create mode 100644 ratpack/src/main/java/com/baeldung/model/Quote.java create mode 100644 ratpack/src/main/java/com/baeldung/rxjava/service/QuotesService.java create mode 100644 ratpack/src/main/java/com/baeldung/spring/EmbedRatpackStreamsApp.java create mode 100644 ratpack/src/test/java/com/baeldung/ratpack/CompliantPublisher.java create mode 100644 ratpack/src/test/java/com/baeldung/ratpack/LoggingSubscriber.java create mode 100644 ratpack/src/test/java/com/baeldung/ratpack/NonCompliantPublisher.java create mode 100644 ratpack/src/test/java/com/baeldung/ratpack/RatpackStreamsUnitTest.java diff --git a/ratpack/src/main/java/com/baeldung/model/Quote.java b/ratpack/src/main/java/com/baeldung/model/Quote.java new file mode 100644 index 0000000000..009a85fa11 --- /dev/null +++ b/ratpack/src/main/java/com/baeldung/model/Quote.java @@ -0,0 +1,107 @@ +package com.baeldung.model; + +import java.time.Instant; + +public class Quote { + + private Instant ts; + private String symbol; + private double value; + + public Quote() {} + + + public Quote(Instant ts, String symbol, double value) { + this.ts = ts; + this.symbol = symbol; + this.value = value; + } + + + /** + * @return the ts + */ + public Instant getTs() { + return ts; + } + + /** + * @param ts the ts to set + */ + public void setTs(Instant ts) { + this.ts = ts; + } + + /** + * @return the symbol + */ + public String getSymbol() { + return symbol; + } + + /** + * @param symbol the symbol to set + */ + public void setSymbol(String symbol) { + this.symbol = symbol; + } + + /** + * @return the value + */ + public double getValue() { + return value; + } + + /** + * @param value the value to set + */ + public void setValue(double value) { + this.value = value; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + ((symbol == null) ? 0 : symbol.hashCode()); + result = prime * result + ((ts == null) ? 0 : ts.hashCode()); + long temp; + temp = Double.doubleToLongBits(value); + result = prime * result + (int) (temp ^ (temp >>> 32)); + return result; + } + + + @Override + public boolean equals(Object obj) { + if (this == obj) + return true; + if (obj == null) + return false; + if (getClass() != obj.getClass()) + return false; + Quote other = (Quote) obj; + if (symbol == null) { + if (other.symbol != null) + return false; + } else if (!symbol.equals(other.symbol)) + return false; + if (ts == null) { + if (other.ts != null) + return false; + } else if (!ts.equals(other.ts)) + return false; + if (Double.doubleToLongBits(value) != Double.doubleToLongBits(other.value)) + return false; + return true; + } + + + @Override + public String toString() { + return "Quote [ts=" + ts + ", symbol=" + symbol + ", value=" + value + "]"; + } + + +} diff --git a/ratpack/src/main/java/com/baeldung/rxjava/service/QuotesService.java b/ratpack/src/main/java/com/baeldung/rxjava/service/QuotesService.java new file mode 100644 index 0000000000..7c073ee1de --- /dev/null +++ b/ratpack/src/main/java/com/baeldung/rxjava/service/QuotesService.java @@ -0,0 +1,44 @@ +package com.baeldung.rxjava.service; + +import java.time.Duration; +import java.time.Instant; +import java.util.Random; +import java.util.concurrent.ScheduledExecutorService; + +import org.reactivestreams.Publisher; + +import com.baeldung.model.Quote; + +import ratpack.stream.Streams; + +public class QuotesService { + + private final ScheduledExecutorService executorService; + private static Random rnd = new Random(); + private static String[] symbols = new String[] { + "MSFT", + "ORCL", + "GOOG", + "AAPL", + "CSCO" + }; + + public QuotesService(ScheduledExecutorService executorService) { + this.executorService = executorService; + } + + public Publisher newTicker() { + return Streams.periodically(executorService, Duration.ofSeconds(2), (t) -> { + + return randomQuote(); + }); + } + + private static Quote randomQuote() { + return new Quote ( + Instant.now(), + symbols[rnd.nextInt(symbols.length)], + Math.round(rnd.nextDouble()*100) + ); + } +} diff --git a/ratpack/src/main/java/com/baeldung/spring/EmbedRatpackStreamsApp.java b/ratpack/src/main/java/com/baeldung/spring/EmbedRatpackStreamsApp.java new file mode 100644 index 0000000000..dc66efbecb --- /dev/null +++ b/ratpack/src/main/java/com/baeldung/spring/EmbedRatpackStreamsApp.java @@ -0,0 +1,206 @@ +package com.baeldung.spring; + +import java.util.Random; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicLong; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; + +import com.baeldung.model.Quote; +import com.baeldung.rxjava.service.QuotesService; + +import groovy.util.logging.Slf4j; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import ratpack.func.Action; +import ratpack.handling.Chain; +import ratpack.http.ResponseChunks; +import ratpack.http.Status; +import ratpack.server.ServerConfig; +import ratpack.spring.config.EnableRatpack; +import ratpack.sse.ServerSentEvents; +import ratpack.stream.Streams; +import ratpack.stream.TransformablePublisher; +import ratpack.websocket.WebSockets; +import rx.subscriptions.Subscriptions; + +/** + * @author psevestre + */ +@SpringBootApplication +@EnableRatpack +public class EmbedRatpackStreamsApp { + + private static final Logger log = LoggerFactory.getLogger(EmbedRatpackStreamsApp.class); + + @Autowired + private QuotesService quotesService; + + private AtomicLong idSeq = new AtomicLong(0); + + + @Bean + public ScheduledExecutorService executorService() { + return Executors.newScheduledThreadPool(1); + } + + @Bean + public QuotesService quotesService(ScheduledExecutorService executor) { + return new QuotesService(executor); + } + + @Bean + public Action quotes() { + ServerSentEvents sse = ServerSentEvents.serverSentEvents(quotesService.newTicker(), (evt) -> { + evt + .id(Long.toString(idSeq.incrementAndGet())) + .event("quote") + .data( q -> q.toString()); + }); + + return chain -> chain.get("quotes", ctx -> ctx.render(sse)); + } + + @Bean + public Action quotesWS() { + Publisher pub = Streams.transformable(quotesService.newTicker()) + .map(Quote::toString); + return chain -> chain.get("quotes-ws", ctx -> WebSockets.websocketBroadcast(ctx, pub)); + } + + @Bean + public Action uploadFile() { + + return chain -> chain.post("upload", ctx -> { + TransformablePublisher pub = ctx.getRequest().getBodyStream(); + pub.subscribe(new Subscriber() { + private Subscription sub; + @Override + public void onSubscribe(Subscription sub) { + this.sub = sub; + sub.request(1); + } + + @Override + public void onNext(ByteBuf t) { + try { + int len = t.readableBytes(); + log.info("Got {} bytes", len); + + // Do something useful with data + + // Request next chunk + sub.request(1); + } + finally { + // DO NOT FORGET to RELEASE ! + t.release(); + } + } + + @Override + public void onError(Throwable t) { + ctx.getResponse().status(500); + } + + @Override + public void onComplete() { + ctx.getResponse().status(202); + } + }); + }); + } + + @Bean + public Action download() { + return chain -> chain.get("download", ctx -> { + ctx.getResponse().sendStream(new RandomBytesPublisher(1024,512)); + }); + } + + @Bean + public Action downloadChunks() { + return chain -> chain.get("downloadChunks", ctx -> { + ctx.render(ResponseChunks.bufferChunks("application/octetstream", + new RandomBytesPublisher(1024,512))); + }); + } + + @Bean + public ServerConfig ratpackServerConfig() { + return ServerConfig + .builder() + .findBaseDir("public") + .build(); + } + + public static void main(String[] args) { + SpringApplication.run(EmbedRatpackStreamsApp.class, args); + } + + + public static class RandomBytesPublisher implements Publisher { + + private int bufCount; + private int bufSize; + private Random rnd = new Random(); + + + RandomBytesPublisher(int bufCount, int bufSize) { + this.bufCount = bufCount; + this.bufSize = bufSize; + } + + @Override + public void subscribe(Subscriber s) { + s.onSubscribe(new Subscription() { + + private boolean cancelled = false; + private boolean recurse; + private long requested = 0; + + @Override + public void request(long n) { + if ( bufCount == 0 ) { + s.onComplete(); + return; + } + + requested += n; + if ( recurse ) { + return; + } + + recurse = true; + try { + while ( requested-- > 0 && !cancelled && bufCount-- > 0 ) { + byte[] data = new byte[bufSize]; + rnd.nextBytes(data); + ByteBuf buf = Unpooled.wrappedBuffer(data); + s.onNext(buf); + } + } + finally { + recurse = false; + } + } + + @Override + public void cancel() { + cancelled = true; + } + }); + + } + } + +} diff --git a/ratpack/src/test/java/com/baeldung/ratpack/CompliantPublisher.java b/ratpack/src/test/java/com/baeldung/ratpack/CompliantPublisher.java new file mode 100644 index 0000000000..5526a630ff --- /dev/null +++ b/ratpack/src/test/java/com/baeldung/ratpack/CompliantPublisher.java @@ -0,0 +1,63 @@ +package com.baeldung.ratpack; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +// Non-thread safe !!! +class CompliantPublisher implements Publisher { + String name; + + private static final Logger log = LoggerFactory.getLogger(CompliantPublisher.class); + private long available; + + + public CompliantPublisher(long available) { + this.available = available; + } + + @Override + public void subscribe(Subscriber subscriber) { + log.info("subscribe"); + subscriber.onSubscribe(new CompliantSubscription(subscriber)); + + } + + + private class CompliantSubscription implements Subscription { + + private Subscriber subscriber; + private int recurseLevel; + private long requested; + private boolean cancelled; + + public CompliantSubscription(Subscriber subscriber) { + this.subscriber = subscriber; + } + + @Override + public void request(long n) { + log.info("request: requested={}, available={}", n, available); + requested += n; + if ( recurseLevel > 0 ) { + return; + } + + recurseLevel++; + for (int i = 0 ; i < (requested) && !cancelled && available > 0 ; i++, available-- ) { + subscriber.onNext(i); + } + subscriber.onComplete(); + } + + @Override + public void cancel() { + cancelled = true; + } + + } + +} \ No newline at end of file diff --git a/ratpack/src/test/java/com/baeldung/ratpack/LoggingSubscriber.java b/ratpack/src/test/java/com/baeldung/ratpack/LoggingSubscriber.java new file mode 100644 index 0000000000..0d58b7c05e --- /dev/null +++ b/ratpack/src/test/java/com/baeldung/ratpack/LoggingSubscriber.java @@ -0,0 +1,67 @@ +package com.baeldung.ratpack; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class LoggingSubscriber implements Subscriber { + private static final Logger log = LoggerFactory.getLogger(LoggingSubscriber.class); + + private Subscription subscription; + private long requested; + private long received; + private CountDownLatch finished = new CountDownLatch(1); + + @Override + public void onComplete() { + log.info("onComplete: sub={}", subscription.hashCode()); + finished.countDown(); + } + + @Override + public void onError(Throwable t) { + log.error("Error: sub={}, message={}", subscription.hashCode(), t.getMessage(),t); + finished.countDown(); + } + + @Override + public void onNext(T value) { + log.info("onNext: sub={}, value={}", subscription.hashCode(), value); + this.received++; + this.requested++; + subscription.request(1); + } + + @Override + public void onSubscribe(Subscription sub) { + log.info("onSubscribe: sub={}", sub.hashCode()); + this.subscription = sub; + this.received = 0; + this.requested = 1; + sub.request(1); + } + + + public long getRequested() { + return requested; + } + + public long getReceived() { + return received; + } + + public void block() { + try { + finished.await(10, TimeUnit.SECONDS); + } + catch(InterruptedException iex) { + throw new RuntimeException(iex); + } + } + +} diff --git a/ratpack/src/test/java/com/baeldung/ratpack/NonCompliantPublisher.java b/ratpack/src/test/java/com/baeldung/ratpack/NonCompliantPublisher.java new file mode 100644 index 0000000000..03b94d429d --- /dev/null +++ b/ratpack/src/test/java/com/baeldung/ratpack/NonCompliantPublisher.java @@ -0,0 +1,46 @@ +package com.baeldung.ratpack; + +import org.reactivestreams.Publisher; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class NonCompliantPublisher implements Publisher { + + private static final Logger log = LoggerFactory.getLogger(NonCompliantPublisher.class); + + @Override + public void subscribe(Subscriber subscriber) { + log.info("subscribe"); + subscriber.onSubscribe(new NonCompliantSubscription(subscriber)); + } + + private class NonCompliantSubscription implements Subscription { + private Subscriber subscriber; + private int recurseLevel = 0; + + public NonCompliantSubscription(Subscriber subscriber) { + this.subscriber = subscriber; + } + + @Override + public void request(long n) { + log.info("request: n={}", n); + if ( recurseLevel > 0 ) { + return; + } + + recurseLevel++; + for (int i = 0 ; i < (n + 5) ; i ++ ) { + subscriber.onNext(i); + } + subscriber.onComplete(); + } + + @Override + public void cancel() { + } + } +} \ No newline at end of file diff --git a/ratpack/src/test/java/com/baeldung/ratpack/RatpackStreamsUnitTest.java b/ratpack/src/test/java/com/baeldung/ratpack/RatpackStreamsUnitTest.java new file mode 100644 index 0000000000..54cc71c328 --- /dev/null +++ b/ratpack/src/test/java/com/baeldung/ratpack/RatpackStreamsUnitTest.java @@ -0,0 +1,140 @@ +package com.baeldung.ratpack; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.time.Duration; +import java.util.Arrays; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import org.junit.jupiter.api.Test; +import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import ratpack.exec.ExecResult; +import ratpack.func.Action; +import ratpack.stream.StreamEvent; +import ratpack.stream.Streams; +import ratpack.stream.TransformablePublisher; +import ratpack.test.exec.ExecHarness; + +public class RatpackStreamsUnitTest { + + private static Logger log = LoggerFactory.getLogger(RatpackStreamsUnitTest.class); + + @Test + public void whenPublish_thenSuccess() { + + Publisher pub = Streams.publish(Arrays.asList("hello", "hello again")); + LoggingSubscriber sub = new LoggingSubscriber(); + pub.subscribe(sub); + sub.block(); + } + + + @Test + public void whenYield_thenSuccess() { + + Publisher pub = Streams.yield((t) -> { + return t.getRequestNum() < 5 ? "hello" : null; + }); + + LoggingSubscriber sub = new LoggingSubscriber(); + pub.subscribe(sub); + sub.block(); + assertEquals(5, sub.getReceived()); + } + + @Test + public void whenPeriodic_thenSuccess() { + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + Publisher pub = Streams.periodically(executor, Duration.ofSeconds(1), (t) -> { + return t < 5 ? String.format("hello %d",t): null; + }); + + LoggingSubscriber sub = new LoggingSubscriber(); + pub.subscribe(sub); + sub.block(); + assertEquals(5, sub.getReceived()); + } + + @Test + public void whenMap_thenSuccess() throws Exception { + + TransformablePublisher pub = Streams.yield( t -> { + return t.getRequestNum() < 5 ? t.getRequestNum() : null; + }) + .map(v -> String.format("item %d", v)); + + ExecResult> result = ExecHarness.yieldSingle((c) -> pub.toList() ); + assertTrue("should succeed", result.isSuccess()); + assertEquals("should have 5 items",5,result.getValue().size()); + } + + @Test + public void whenNonCompliantPublisherWithBuffer_thenSuccess() throws Exception { + + TransformablePublisher pub = Streams.transformable(new NonCompliantPublisher()) + .wiretap(new LoggingAction("before buffer")) + .buffer() + .wiretap(new LoggingAction("after buffer")) + .take(1); + + LoggingSubscriber sub = new LoggingSubscriber<>(); + pub.subscribe(sub); + sub.block(); + } + + @Test + public void whenNonCompliantPublisherWithoutBuffer_thenSuccess() throws Exception { + TransformablePublisher pub = Streams.transformable(new NonCompliantPublisher()) + .wiretap(new LoggingAction("")) + .take(1); + + LoggingSubscriber sub = new LoggingSubscriber<>(); + pub.subscribe(sub); + sub.block(); + } + +@Test +public void whenCompliantPublisherWithoutBatch_thenSuccess() throws Exception { + + TransformablePublisher pub = Streams.transformable(new CompliantPublisher(10)) + .wiretap(new LoggingAction("")); + + LoggingSubscriber sub = new LoggingSubscriber<>(); + pub.subscribe(sub); + sub.block(); +} + +@Test +public void whenCompliantPublisherWithBatch_thenSuccess() throws Exception { + + TransformablePublisher pub = Streams.transformable(new CompliantPublisher(10)) + .wiretap(new LoggingAction("before batch")) + .batch(5, Action.noop()) + .wiretap(new LoggingAction("after batch")); + + LoggingSubscriber sub = new LoggingSubscriber<>(); + pub.subscribe(sub); + sub.block(); +} + + private static class LoggingAction implements Action>{ + private final String label; + + public LoggingAction(String label) { + this.label = label; + } + + @Override + public void execute(StreamEvent e) throws Exception { + log.info("{}: event={}", label,e); + } + + } + +}