From 1fd16afaca102eb0b365efe92dfa882579da8290 Mon Sep 17 00:00:00 2001 From: maryarm Date: Wed, 27 May 2020 18:39:23 +0430 Subject: [PATCH 1/2] BAEL-3287 Code examples for "The Difference between RxJava API and the Java 9 Flow API" --- .../core-java-9-new-features/pom.xml | 6 ++ .../vsrx/FlowApiLiveVideo.java | 72 +++++++++++++++++++ .../vsrx/RxJavaLiveVideo.java | 38 ++++++++++ .../streams.reactive/vsrx/VideoFrame.java | 13 ++++ 4 files changed, 129 insertions(+) create mode 100644 core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/FlowApiLiveVideo.java create mode 100644 core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/RxJavaLiveVideo.java create mode 100644 core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/VideoFrame.java diff --git a/core-java-modules/core-java-9-new-features/pom.xml b/core-java-modules/core-java-9-new-features/pom.xml index b0fb6ab7f9..23dc659af8 100644 --- a/core-java-modules/core-java-9-new-features/pom.xml +++ b/core-java-modules/core-java-9-new-features/pom.xml @@ -16,6 +16,11 @@ + + io.reactivex.rxjava3 + rxjava + ${rxjava.version} + org.assertj assertj-core @@ -58,6 +63,7 @@ 1.2.0 1.9 1.9 + 3.0.0 diff --git a/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/FlowApiLiveVideo.java b/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/FlowApiLiveVideo.java new file mode 100644 index 0000000000..17168dc6a1 --- /dev/null +++ b/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/FlowApiLiveVideo.java @@ -0,0 +1,72 @@ +package com.baeldung.java9.streams.reactive.vsrx; + +import java.util.concurrent.Executors; +import java.util.concurrent.Flow; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SubmissionPublisher; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlowApiLiveVideo { + + private static Logger log = LoggerFactory.getLogger(FlowApiLiveVideo.class); + + static class VideoPlayer implements Flow.Subscriber { + Flow.Subscription subscription = null; + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + subscription.request(1); + } + + @Override + public void onNext(VideoFrame item) { + log.info("play #{}", item.getNumber()); + sleep(30); + subscription.request(1); + } + + @Override + public void onError(Throwable throwable) { + log.error("There is an error in video streaming:{}", throwable.getMessage()); + } + + @Override + public void onComplete() { + log.error("Video has ended"); + } + } + + static class VideoStreamServer extends SubmissionPublisher { + public VideoStreamServer() { + super(Executors.newSingleThreadExecutor(), 1); + } + } + + private static void sleep(int i) { + try { + Thread.sleep(i); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public static void main(String[] args) { + VideoStreamServer streamServer = new VideoStreamServer(); + streamServer.subscribe(new VideoPlayer()); + + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + AtomicLong frameNumber = new AtomicLong(); + executor.scheduleWithFixedDelay(() -> { + streamServer.offer(new VideoFrame(frameNumber.getAndIncrement()), (subscriber, videoFrame) -> { + subscriber.onError(new RuntimeException("Frame#" + videoFrame.getNumber() + " droped because of back pressure")); + return true; + }); + }, 0, 1, TimeUnit.MILLISECONDS); + + sleep(1000); + } +} diff --git a/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/RxJavaLiveVideo.java b/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/RxJavaLiveVideo.java new file mode 100644 index 0000000000..abd976c283 --- /dev/null +++ b/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/RxJavaLiveVideo.java @@ -0,0 +1,38 @@ +package com.baeldung.java9.streams.reactive.vsrx; + +import io.reactivex.rxjava3.core.BackpressureOverflowStrategy; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.schedulers.Schedulers; +import java.util.concurrent.Executors; +import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RxJavaLiveVideo { + + private static Logger log = LoggerFactory.getLogger(RxJavaLiveVideo.class); + + public static void main(String[] args) { + Flowable + .fromStream(Stream.iterate(new VideoFrame(0), videoFrame -> { + sleep(1); + return new VideoFrame(videoFrame.getNumber() + 1); + })) + .onBackpressureBuffer(10, null, BackpressureOverflowStrategy.ERROR) + .subscribeOn(Schedulers.from(Executors.newSingleThreadScheduledExecutor()), true) + .observeOn(Schedulers.from(Executors.newSingleThreadExecutor())) + .doOnError(Throwable::printStackTrace) + .subscribe(item -> { + log.info("play #" + item.getNumber()); + sleep(30); + }); + } + + private static void sleep(int i) { + try { + Thread.sleep(i); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/VideoFrame.java b/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/VideoFrame.java new file mode 100644 index 0000000000..609e17910b --- /dev/null +++ b/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/VideoFrame.java @@ -0,0 +1,13 @@ +package com.baeldung.java9.streams.reactive.vsrx; + +class VideoFrame { + private long number; + + public VideoFrame(long number) { + this.number = number; + } + + public long getNumber() { + return number; + } +} \ No newline at end of file From a1f8f90bd8428030cd72d094d81c364aa3cb90e2 Mon Sep 17 00:00:00 2001 From: maryarm Date: Mon, 1 Jun 2020 00:22:31 +0430 Subject: [PATCH 2/2] BAEL-3287: rename the package to apply review note --- .../vsrx => streams/reactive/flowvsrx}/FlowApiLiveVideo.java | 2 +- .../vsrx => streams/reactive/flowvsrx}/RxJavaLiveVideo.java | 2 +- .../vsrx => streams/reactive/flowvsrx}/VideoFrame.java | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) rename core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/{streams.reactive/vsrx => streams/reactive/flowvsrx}/FlowApiLiveVideo.java (97%) rename core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/{streams.reactive/vsrx => streams/reactive/flowvsrx}/RxJavaLiveVideo.java (95%) rename core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/{streams.reactive/vsrx => streams/reactive/flowvsrx}/VideoFrame.java (76%) diff --git a/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/FlowApiLiveVideo.java b/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams/reactive/flowvsrx/FlowApiLiveVideo.java similarity index 97% rename from core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/FlowApiLiveVideo.java rename to core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams/reactive/flowvsrx/FlowApiLiveVideo.java index 17168dc6a1..6f0b49ea0c 100644 --- a/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/FlowApiLiveVideo.java +++ b/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams/reactive/flowvsrx/FlowApiLiveVideo.java @@ -1,4 +1,4 @@ -package com.baeldung.java9.streams.reactive.vsrx; +package com.baeldung.java9.streams.reactive.flowvsrx; import java.util.concurrent.Executors; import java.util.concurrent.Flow; diff --git a/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/RxJavaLiveVideo.java b/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams/reactive/flowvsrx/RxJavaLiveVideo.java similarity index 95% rename from core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/RxJavaLiveVideo.java rename to core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams/reactive/flowvsrx/RxJavaLiveVideo.java index abd976c283..916c930b38 100644 --- a/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/RxJavaLiveVideo.java +++ b/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams/reactive/flowvsrx/RxJavaLiveVideo.java @@ -1,4 +1,4 @@ -package com.baeldung.java9.streams.reactive.vsrx; +package com.baeldung.java9.streams.reactive.flowvsrx; import io.reactivex.rxjava3.core.BackpressureOverflowStrategy; import io.reactivex.rxjava3.core.Flowable; diff --git a/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/VideoFrame.java b/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams/reactive/flowvsrx/VideoFrame.java similarity index 76% rename from core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/VideoFrame.java rename to core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams/reactive/flowvsrx/VideoFrame.java index 609e17910b..b6e3de13c3 100644 --- a/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/VideoFrame.java +++ b/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams/reactive/flowvsrx/VideoFrame.java @@ -1,4 +1,4 @@ -package com.baeldung.java9.streams.reactive.vsrx; +package com.baeldung.java9.streams.reactive.flowvsrx; class VideoFrame { private long number;