From 9184693a2b75b16e4f36bb64b4aceacb0a2994e8 Mon Sep 17 00:00:00 2001 From: Tomasz Lelek Date: Fri, 2 Jun 2017 18:48:17 +0200 Subject: [PATCH] BAEL-679 code for reactive streams (#1970) * BAEL-679 code for reactive streams * BAEL-679 typo --- .../java9/streams.reactive/EndSubscriber.java | 43 ++++++++++++ .../streams.reactive/TransformProcessor.java | 38 ++++++++++ .../streams.reactive/ReactiveStreamsTest.java | 69 +++++++++++++++++++ 3 files changed, 150 insertions(+) create mode 100644 core-java-9/src/main/java/com/baeldung/java9/streams.reactive/EndSubscriber.java create mode 100644 core-java-9/src/main/java/com/baeldung/java9/streams.reactive/TransformProcessor.java create mode 100644 core-java-9/src/test/java/com/baeldung/java9/streams.reactive/ReactiveStreamsTest.java diff --git a/core-java-9/src/main/java/com/baeldung/java9/streams.reactive/EndSubscriber.java b/core-java-9/src/main/java/com/baeldung/java9/streams.reactive/EndSubscriber.java new file mode 100644 index 0000000000..bbbb4f3411 --- /dev/null +++ b/core-java-9/src/main/java/com/baeldung/java9/streams.reactive/EndSubscriber.java @@ -0,0 +1,43 @@ +package com.baeldung.java9.streams.reactive; + +import java.util.LinkedList; +import java.util.List; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; +import java.util.concurrent.atomic.AtomicInteger; + +public class EndSubscriber implements Subscriber { + private final AtomicInteger howMuchMessagesToConsume; + private Subscription subscription; + public List consumedElements = new LinkedList<>(); + + public EndSubscriber(Integer howMuchMessagesToConsume) { + this.howMuchMessagesToConsume = new AtomicInteger(howMuchMessagesToConsume); + } + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + subscription.request(1); + } + + @Override + public void onNext(T item) { + howMuchMessagesToConsume.decrementAndGet(); + System.out.println("Got : " + item); + consumedElements.add(item); + if (howMuchMessagesToConsume.get() > 0) { + subscription.request(1); + } + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + + @Override + public void onComplete() { + System.out.println("Done"); + } +} \ No newline at end of file diff --git a/core-java-9/src/main/java/com/baeldung/java9/streams.reactive/TransformProcessor.java b/core-java-9/src/main/java/com/baeldung/java9/streams.reactive/TransformProcessor.java new file mode 100644 index 0000000000..325979470f --- /dev/null +++ b/core-java-9/src/main/java/com/baeldung/java9/streams.reactive/TransformProcessor.java @@ -0,0 +1,38 @@ +package com.baeldung.java9.streams.reactive; + +import java.util.concurrent.Flow; +import java.util.concurrent.SubmissionPublisher; +import java.util.function.Function; + +public class TransformProcessor extends SubmissionPublisher implements Flow.Processor { + + private Function function; + private Flow.Subscription subscription; + + public TransformProcessor(Function function) { + super(); + this.function = function; + } + + @Override + public void onSubscribe(Flow.Subscription subscription) { + this.subscription = subscription; + subscription.request(1); + } + + @Override + public void onNext(T item) { + submit(function.apply(item)); + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + + @Override + public void onComplete() { + close(); + } +} \ No newline at end of file diff --git a/core-java-9/src/test/java/com/baeldung/java9/streams.reactive/ReactiveStreamsTest.java b/core-java-9/src/test/java/com/baeldung/java9/streams.reactive/ReactiveStreamsTest.java new file mode 100644 index 0000000000..b35d2b78eb --- /dev/null +++ b/core-java-9/src/test/java/com/baeldung/java9/streams.reactive/ReactiveStreamsTest.java @@ -0,0 +1,69 @@ +package com.baeldung.java9.streams.reactive; + + +import org.junit.Test; + +import java.util.List; +import java.util.concurrent.SubmissionPublisher; + +import static org.assertj.core.api.Java6Assertions.assertThat; + +public class ReactiveStreamsTest { + + @Test + public void givenPublisher_whenSubscribeToIt_thenShouldConsumeAllElements() throws InterruptedException { + //given + SubmissionPublisher publisher = new SubmissionPublisher<>(); + EndSubscriber subscriber = new EndSubscriber<>(6); + publisher.subscribe(subscriber); + List items = List.of("1", "x", "2", "x", "3", "x"); + + //when + assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1); + items.forEach(publisher::submit); + publisher.close(); + + //then + Thread.sleep(1000); + assertThat(subscriber.consumedElements).containsExactlyElementsOf(items); + } + + @Test + public void givenPublisher_whenSubscribeAndTransformElements_thenShouldConsumeAllElements() throws InterruptedException { + //given + SubmissionPublisher publisher = new SubmissionPublisher<>(); + TransformProcessor transformProcessor = new TransformProcessor<>(Integer::parseInt); + EndSubscriber subscriber = new EndSubscriber<>(3); + List items = List.of("1", "2", "3"); + List expectedResult = List.of(1, 2, 3); + + //when + publisher.subscribe(transformProcessor); + transformProcessor.subscribe(subscriber); + items.forEach(publisher::submit); + publisher.close(); + + //then + Thread.sleep(1000); + assertThat(subscriber.consumedElements).containsExactlyElementsOf(expectedResult); + } + + @Test + public void givenPublisher_whenRequestForOnlyOneElement_thenShouldConsumeOnlyThatOne() throws InterruptedException { + //given + SubmissionPublisher publisher = new SubmissionPublisher<>(); + EndSubscriber subscriber = new EndSubscriber<>(1); + publisher.subscribe(subscriber); + List items = List.of("1", "x", "2", "x", "3", "x"); + List expected = List.of("1"); + + //when + assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1); + items.forEach(publisher::submit); + publisher.close(); + + //then + Thread.sleep(1000); + assertThat(subscriber.consumedElements).containsExactlyElementsOf(expected); + } +}