Merge pull request #9376 from maryarm/BAEL-3287

BAEL-3287
This commit is contained in:
Eric Martin 2020-06-18 17:35:54 -05:00 committed by GitHub
commit 2457413be1
5 changed files with 187 additions and 0 deletions

View File

@ -16,6 +16,11 @@
</parent>
<dependencies>
<dependency>
<groupId>io.reactivex.rxjava3</groupId>
<artifactId>rxjava</artifactId>
<version>${rxjava.version}</version>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
@ -149,6 +154,7 @@
</pluginRepositories>
<properties>
<rxjava.version>3.0.0</rxjava.version>
<!-- testing -->
<assertj.version>3.10.0</assertj.version>
<junit.platform.version>1.2.0</junit.platform.version>

View File

@ -0,0 +1,71 @@
package com.baeldung.java9.streams.reactive.flowvsrx;
import java.util.concurrent.Executors;
import java.util.concurrent.Flow;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.SubmissionPublisher;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
public class FlowApiLiveVideo {
static class VideoPlayer implements Flow.Subscriber<VideoFrame> {
Flow.Subscription subscription = null;
private long consumerDelay = 30;
public VideoPlayer(long consumerDelay) {
this.consumerDelay = consumerDelay;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(VideoFrame item) {
try {
Thread.sleep(consumerDelay);
} catch (InterruptedException e) {
e.printStackTrace();
}
subscription.request(1);
}
@Override
public void onError(Throwable throwable) {
}
@Override
public void onComplete() {
}
}
static class VideoStreamServer extends SubmissionPublisher<VideoFrame> {
ScheduledExecutorService executor = null;
public VideoStreamServer(int bufferSize) {
super(Executors.newSingleThreadExecutor(), bufferSize);
executor = Executors.newScheduledThreadPool(1);
}
void startStreaming(long produceDelay, Runnable onDrop) {
AtomicLong frameNumber = new AtomicLong();
executor.scheduleWithFixedDelay(() -> {
offer(new VideoFrame(frameNumber.getAndIncrement()), (subscriber, videoFrame) -> {
subscriber.onError(new RuntimeException("Frame#" + videoFrame.getNumber() + " dropped because of back pressure"));
onDrop.run();
return true;
});
}, 0, produceDelay, TimeUnit.MILLISECONDS);
}
}
public static void streamLiveVideo(long produceDelay, long consumeDelay, int bufferSize, Runnable onError){
FlowApiLiveVideo.VideoStreamServer streamServer = new FlowApiLiveVideo.VideoStreamServer(bufferSize);
streamServer.subscribe(new FlowApiLiveVideo.VideoPlayer(consumeDelay));
streamServer.startStreaming(produceDelay, onError);
}
}

View File

@ -0,0 +1,61 @@
package com.baeldung.java9.streams.reactive.flowvsrx;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import org.junit.Test;
import org.junit.jupiter.api.Assertions;
import static org.awaitility.Awaitility.await;
public class LiveVideoFlowVsRxUnitTest {
private final static long SLOW_CONSUMER_DELAY = 30;
private final static long FAST_CONSUMER_DELAY = 1;
private final static long PRODUCER_DELAY = 1;
private final static int BUFFER_SIZE = 10;
private final static long AWAIT = 1000;
@Test
public void givenSlowVideoPlayer_whenSubscribedToFlowApiLiveVideo_thenExpectErrorOnBackPressure() {
AtomicLong errors = new AtomicLong();
FlowApiLiveVideo.streamLiveVideo(PRODUCER_DELAY, SLOW_CONSUMER_DELAY, BUFFER_SIZE, errors::incrementAndGet);
await()
.atMost(AWAIT, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assertions.assertTrue(errors.get() > 0));
}
@Test
public void givenFastVideoPlayer_whenSubscribedToFlowApiLiveVideo_thenExpectNoErrorOnBackPressure() throws InterruptedException {
AtomicLong errors = new AtomicLong();
FlowApiLiveVideo.streamLiveVideo(PRODUCER_DELAY, FAST_CONSUMER_DELAY, BUFFER_SIZE, errors::incrementAndGet);
Thread.sleep(AWAIT);
Assertions.assertEquals(0, errors.get());
}
@Test
public void givenSlowVideoPlayer_whenSubscribedToRxJavaLiveVideo_thenExpectErrorOnBackPressure() {
AtomicLong errors = new AtomicLong();
RxJavaLiveVideo.streamLiveVideo(PRODUCER_DELAY, SLOW_CONSUMER_DELAY, BUFFER_SIZE, errors::incrementAndGet);
await()
.atMost(AWAIT, TimeUnit.MILLISECONDS)
.untilAsserted(() -> Assertions.assertTrue(errors.get() > 0));
}
@Test
public void givenFastVideoPlayer_whenSubscribedToRxJavaLiveVideo_thenExpectNoErrorOnBackPressure() throws InterruptedException {
AtomicLong errors = new AtomicLong();
RxJavaLiveVideo.streamLiveVideo(PRODUCER_DELAY, FAST_CONSUMER_DELAY, BUFFER_SIZE, errors::incrementAndGet);
Thread.sleep(AWAIT);
Assertions.assertEquals(0, errors.get());
}
}

View File

@ -0,0 +1,36 @@
package com.baeldung.java9.streams.reactive.flowvsrx;
import io.reactivex.rxjava3.core.BackpressureOverflowStrategy;
import io.reactivex.rxjava3.core.Flowable;
import io.reactivex.rxjava3.disposables.Disposable;
import io.reactivex.rxjava3.schedulers.Schedulers;
import java.util.concurrent.Executors;
import java.util.stream.Stream;
public class RxJavaLiveVideo {
public static Disposable streamLiveVideo(long produceDelay, long consumeDelay, int bufferSize, Runnable onError) {
return Flowable
.fromStream(Stream.iterate(new VideoFrame(0), videoFrame -> {
sleep(produceDelay);
return new VideoFrame(videoFrame.getNumber() + 1);
}))
.subscribeOn(Schedulers.from(Executors.newSingleThreadScheduledExecutor()), true)
.onBackpressureBuffer(bufferSize, null, BackpressureOverflowStrategy.ERROR)
.observeOn(Schedulers.from(Executors.newSingleThreadExecutor()))
.subscribe(item -> {
sleep(consumeDelay);
}, throwable -> {
onError.run();
});
}
private static void sleep(long i) {
try {
Thread.sleep(i);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

View File

@ -0,0 +1,13 @@
package com.baeldung.java9.streams.reactive.flowvsrx;
class VideoFrame {
private long number;
public VideoFrame(long number) {
this.number = number;
}
public long getNumber() {
return number;
}
}