diff --git a/core-java-modules/core-java-9-new-features/pom.xml b/core-java-modules/core-java-9-new-features/pom.xml index b0fb6ab7f9..23dc659af8 100644 --- a/core-java-modules/core-java-9-new-features/pom.xml +++ b/core-java-modules/core-java-9-new-features/pom.xml @@ -16,6 +16,11 @@ + + io.reactivex.rxjava3 + rxjava + ${rxjava.version} + org.assertj assertj-core @@ -58,6 +63,7 @@ 1.2.0 1.9 1.9 + 3.0.0 diff --git a/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/FlowApiLiveVideo.java b/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/FlowApiLiveVideo.java new file mode 100644 index 0000000000..17168dc6a1 --- /dev/null +++ b/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/FlowApiLiveVideo.java @@ -0,0 +1,72 @@ +package com.baeldung.java9.streams.reactive.vsrx; + +import java.util.concurrent.Executors; +import java.util.concurrent.Flow; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.SubmissionPublisher; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class FlowApiLiveVideo { + + private static Logger log = LoggerFactory.getLogger(FlowApiLiveVideo.class); + + static class VideoPlayer implements Flow.Subscriber { + Flow.Subscription subscription = null; + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + subscription.request(1); + } + + @Override + public void onNext(VideoFrame item) { + log.info("play #{}", item.getNumber()); + sleep(30); + subscription.request(1); + } + + @Override + public void onError(Throwable throwable) { + log.error("There is an error in video streaming:{}", throwable.getMessage()); + } + + @Override + public void onComplete() { + log.error("Video has ended"); + } + } + + static class VideoStreamServer extends SubmissionPublisher { + public VideoStreamServer() { + super(Executors.newSingleThreadExecutor(), 1); + } + } + + private static void sleep(int i) { + try { + Thread.sleep(i); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + + public static void main(String[] args) { + VideoStreamServer streamServer = new VideoStreamServer(); + streamServer.subscribe(new VideoPlayer()); + + ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); + AtomicLong frameNumber = new AtomicLong(); + executor.scheduleWithFixedDelay(() -> { + streamServer.offer(new VideoFrame(frameNumber.getAndIncrement()), (subscriber, videoFrame) -> { + subscriber.onError(new RuntimeException("Frame#" + videoFrame.getNumber() + " droped because of back pressure")); + return true; + }); + }, 0, 1, TimeUnit.MILLISECONDS); + + sleep(1000); + } +} diff --git a/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/RxJavaLiveVideo.java b/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/RxJavaLiveVideo.java new file mode 100644 index 0000000000..abd976c283 --- /dev/null +++ b/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/RxJavaLiveVideo.java @@ -0,0 +1,38 @@ +package com.baeldung.java9.streams.reactive.vsrx; + +import io.reactivex.rxjava3.core.BackpressureOverflowStrategy; +import io.reactivex.rxjava3.core.Flowable; +import io.reactivex.rxjava3.schedulers.Schedulers; +import java.util.concurrent.Executors; +import java.util.stream.Stream; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class RxJavaLiveVideo { + + private static Logger log = LoggerFactory.getLogger(RxJavaLiveVideo.class); + + public static void main(String[] args) { + Flowable + .fromStream(Stream.iterate(new VideoFrame(0), videoFrame -> { + sleep(1); + return new VideoFrame(videoFrame.getNumber() + 1); + })) + .onBackpressureBuffer(10, null, BackpressureOverflowStrategy.ERROR) + .subscribeOn(Schedulers.from(Executors.newSingleThreadScheduledExecutor()), true) + .observeOn(Schedulers.from(Executors.newSingleThreadExecutor())) + .doOnError(Throwable::printStackTrace) + .subscribe(item -> { + log.info("play #" + item.getNumber()); + sleep(30); + }); + } + + private static void sleep(int i) { + try { + Thread.sleep(i); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } +} diff --git a/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/VideoFrame.java b/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/VideoFrame.java new file mode 100644 index 0000000000..609e17910b --- /dev/null +++ b/core-java-modules/core-java-9-new-features/src/main/java/com/baeldung/java9/streams.reactive/vsrx/VideoFrame.java @@ -0,0 +1,13 @@ +package com.baeldung.java9.streams.reactive.vsrx; + +class VideoFrame { + private long number; + + public VideoFrame(long number) { + this.number = number; + } + + public long getNumber() { + return number; + } +} \ No newline at end of file