BAEL-3287

Code examples for "The Difference between RxJava API and the Java 9 Flow API"
This commit is contained in:
maryarm 2020-05-27 18:39:23 +04:30
parent 2bdba48843
commit 1fd16afaca
4 changed files with 129 additions and 0 deletions

View File

@ -16,6 +16,11 @@
</parent>
<dependencies>
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>${rxjava.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
@ -58,6 +63,7 @@
<junit.platform.version>1.2.0</junit.platform.version>
<maven.compiler.source>1.9</maven.compiler.source>
<maven.compiler.target>1.9</maven.compiler.target>
<rxjava.version>3.0.0</rxjava.version>
</properties>
</project>

View File

@ -0,0 +1,72 @@
package com.baeldung.java9.streams.reactive.vsrx;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class FlowApiLiveVideo {
private static Logger log = LoggerFactory.getLogger(FlowApiLiveVideo.class);
static class VideoPlayer implements Flow.Subscriber<VideoFrame> {
Flow.Subscription subscription = null;
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(VideoFrame item) {
log.info("play #{}", item.getNumber());
sleep(30);
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
log.error("There is an error in video streaming:{}", throwable.getMessage());
}
@Override
public void onComplete() {
log.error("Video has ended");
}
}
static class VideoStreamServer extends SubmissionPublisher<VideoFrame> {
public VideoStreamServer() {
super(Executors.newSingleThreadExecutor(), 1);
}
}
private static void sleep(int i) {
try {
Thread.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
VideoStreamServer streamServer = new VideoStreamServer();
streamServer.subscribe(new VideoPlayer());
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1);
AtomicLong frameNumber = new AtomicLong();
executor.scheduleWithFixedDelay(() -> {
streamServer.offer(new VideoFrame(frameNumber.getAndIncrement()), (subscriber, videoFrame) -> {
subscriber.onError(new RuntimeException("Frame#" + videoFrame.getNumber() + " droped because of back pressure"));
return true;
});
}, 0, 1, TimeUnit.MILLISECONDS);
sleep(1000);
}
}

View File

@ -0,0 +1,38 @@
package com.baeldung.java9.streams.reactive.vsrx;
import io.reactivex.rxjava3.core.BackpressureOverflowStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class RxJavaLiveVideo {
private static Logger log = LoggerFactory.getLogger(RxJavaLiveVideo.class);
public static void main(String[] args) {
Flowable
.fromStream(Stream.iterate(new VideoFrame(0), videoFrame -> {
sleep(1);
return new VideoFrame(videoFrame.getNumber() + 1);
}))
.onBackpressureBuffer(10, null, BackpressureOverflowStrategy.ERROR)
.subscribeOn(Schedulers.from(Executors.newSingleThreadScheduledExecutor()), true)
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.doOnError(Throwable::printStackTrace)
.subscribe(item -> {
log.info("play #" + item.getNumber());
sleep(30);
});
}
private static void sleep(int i) {
try {
Thread.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,13 @@
package com.baeldung.java9.streams.reactive.vsrx;
class VideoFrame {
private long number;
public VideoFrame(long number) {
this.number = number;
}
public long getNumber() {
return number;
}
}