BAEL-679 code for reactive streams (#1970)

* BAEL-679 code for reactive streams

* BAEL-679 typo
This commit is contained in:
Tomasz Lelek 2017-06-02 18:48:17 +02:00 committed by Grzegorz Piwowarek
parent d614fc2fa7
commit 9184693a2b
3 changed files with 150 additions and 0 deletions

View File

@ -0,0 +1,43 @@
package com.baeldung.java9.streams.reactive;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.atomic.AtomicInteger;
public class EndSubscriber<T> implements Subscriber<T> {
private final AtomicInteger howMuchMessagesToConsume;
private Subscription subscription;
public List<T> consumedElements = new LinkedList<>();
public EndSubscriber(Integer howMuchMessagesToConsume) {
this.howMuchMessagesToConsume = new AtomicInteger(howMuchMessagesToConsume);
}
@Override
public void onSubscribe(Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
howMuchMessagesToConsume.decrementAndGet();
System.out.println("Got : " + item);
consumedElements.add(item);
if (howMuchMessagesToConsume.get() > 0) {
subscription.request(1);
}
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
System.out.println("Done");
}
}

View File

@ -0,0 +1,38 @@
package com.baeldung.java9.streams.reactive;
import java.util.concurrent.Flow;
import java.util.concurrent.SubmissionPublisher;
import java.util.function.Function;
public class TransformProcessor<T, R> extends SubmissionPublisher<R> implements Flow.Processor<T, R> {
private Function<T, R> function;
private Flow.Subscription subscription;
public TransformProcessor(Function<T, R> function) {
super();
this.function = function;
}
@Override
public void onSubscribe(Flow.Subscription subscription) {
this.subscription = subscription;
subscription.request(1);
}
@Override
public void onNext(T item) {
submit(function.apply(item));
subscription.request(1);
}
@Override
public void onError(Throwable t) {
t.printStackTrace();
}
@Override
public void onComplete() {
close();
}
}

View File

@ -0,0 +1,69 @@
package com.baeldung.java9.streams.reactive;
import org.junit.Test;
import java.util.List;
import java.util.concurrent.SubmissionPublisher;
import static org.assertj.core.api.Java6Assertions.assertThat;
public class ReactiveStreamsTest {
@Test
public void givenPublisher_whenSubscribeToIt_thenShouldConsumeAllElements() throws InterruptedException {
//given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>(6);
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
//when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();
//then
Thread.sleep(1000);
assertThat(subscriber.consumedElements).containsExactlyElementsOf(items);
}
@Test
public void givenPublisher_whenSubscribeAndTransformElements_thenShouldConsumeAllElements() throws InterruptedException {
//given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
TransformProcessor<String, Integer> transformProcessor = new TransformProcessor<>(Integer::parseInt);
EndSubscriber<Integer> subscriber = new EndSubscriber<>(3);
List<String> items = List.of("1", "2", "3");
List<Integer> expectedResult = List.of(1, 2, 3);
//when
publisher.subscribe(transformProcessor);
transformProcessor.subscribe(subscriber);
items.forEach(publisher::submit);
publisher.close();
//then
Thread.sleep(1000);
assertThat(subscriber.consumedElements).containsExactlyElementsOf(expectedResult);
}
@Test
public void givenPublisher_whenRequestForOnlyOneElement_thenShouldConsumeOnlyThatOne() throws InterruptedException {
//given
SubmissionPublisher<String> publisher = new SubmissionPublisher<>();
EndSubscriber<String> subscriber = new EndSubscriber<>(1);
publisher.subscribe(subscriber);
List<String> items = List.of("1", "x", "2", "x", "3", "x");
List<String> expected = List.of("1");
//when
assertThat(publisher.getNumberOfSubscribers()).isEqualTo(1);
items.forEach(publisher::submit);
publisher.close();
//then
Thread.sleep(1000);
assertThat(subscriber.consumedElements).containsExactlyElementsOf(expected);
}
}