BAEL-679. Java 9 Reactive Streams. Second draft.
This commit is contained in:
parent
a055ab5f81
commit
6def745640
|
@ -0,0 +1,84 @@
|
||||||
|
package com.baeldung.java9.reactive;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.concurrent.Flow.Subscriber;
|
||||||
|
import java.util.concurrent.Flow.Subscription;
|
||||||
|
|
||||||
|
public class BaeldungBatchSubscriberImpl<T> implements Subscriber<String> {
|
||||||
|
private Subscription subscription;
|
||||||
|
private boolean completed = false;
|
||||||
|
private int counter;
|
||||||
|
private ArrayList<String> buffer;
|
||||||
|
public static final int BUFFER_SIZE = 5;
|
||||||
|
|
||||||
|
public BaeldungBatchSubscriberImpl() {
|
||||||
|
buffer = new ArrayList<String>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isCompleted() {
|
||||||
|
return completed;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCompleted(boolean completed) {
|
||||||
|
this.completed = completed;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getCounter() {
|
||||||
|
return counter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCounter(int counter) {
|
||||||
|
this.counter = counter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSubscribe(Subscription subscription) {
|
||||||
|
this.subscription = subscription;
|
||||||
|
subscription.request(BUFFER_SIZE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onNext(String item) {
|
||||||
|
buffer.add(item);
|
||||||
|
// if buffer is full, process the items.
|
||||||
|
if (buffer.size() >= BUFFER_SIZE) {
|
||||||
|
processBuffer();
|
||||||
|
subscription.request(BUFFER_SIZE);
|
||||||
|
} else if(buffer.size() == 0) {
|
||||||
|
// If buffer empty, request more items.
|
||||||
|
subscription.request(BUFFER_SIZE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void processBuffer() {
|
||||||
|
if (buffer.isEmpty())
|
||||||
|
return;
|
||||||
|
// Process all items in the buffer. Here, we just print it and sleep for 1 second.
|
||||||
|
System.out.print("Processed items: ");
|
||||||
|
buffer.stream()
|
||||||
|
.forEach(item -> {
|
||||||
|
System.out.print(" " + item);
|
||||||
|
});
|
||||||
|
System.out.println();
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
counter = counter + buffer.size();
|
||||||
|
buffer.clear();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable t) {
|
||||||
|
t.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onComplete() {
|
||||||
|
completed = true;
|
||||||
|
// process any remaining items in buffer before
|
||||||
|
processBuffer();
|
||||||
|
subscription.cancel();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,55 @@
|
||||||
|
package com.baeldung.java9.reactive;
|
||||||
|
|
||||||
|
import java.util.concurrent.Flow.Subscriber;
|
||||||
|
import java.util.concurrent.Flow.Subscription;
|
||||||
|
|
||||||
|
public class BaeldungSubscriberImpl<T> implements Subscriber<String> {
|
||||||
|
private Subscription subscription;
|
||||||
|
private boolean completed = false;
|
||||||
|
private int counter;
|
||||||
|
|
||||||
|
public boolean isCompleted() {
|
||||||
|
return completed;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCompleted(boolean completed) {
|
||||||
|
this.completed = completed;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getCounter() {
|
||||||
|
return counter;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setCounter(int counter) {
|
||||||
|
this.counter = counter;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onSubscribe(Subscription subscription) {
|
||||||
|
this.subscription = subscription;
|
||||||
|
subscription.request(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onNext(String item) {
|
||||||
|
try {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
counter++;
|
||||||
|
System.out.println("Processed item : " + item);
|
||||||
|
subscription.request(1);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onError(Throwable t) {
|
||||||
|
t.printStackTrace();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void onComplete() {
|
||||||
|
completed = true;
|
||||||
|
subscription.cancel();
|
||||||
|
}
|
||||||
|
}
|
|
@ -0,0 +1,75 @@
|
||||||
|
package com.baeldung.java9.reactive;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.concurrent.ForkJoinPool;
|
||||||
|
import java.util.concurrent.SubmissionPublisher;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.Stopwatch;
|
||||||
|
|
||||||
|
public class BaeldungBatchSubscriberImplTest {
|
||||||
|
|
||||||
|
private static final int ITEM_SIZE = 10;
|
||||||
|
private SubmissionPublisher<String> publisher;
|
||||||
|
private BaeldungBatchSubscriberImpl<String> subscriber;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void initialize() {
|
||||||
|
this.publisher = new SubmissionPublisher<String>(ForkJoinPool.commonPool(), 6);
|
||||||
|
this.subscriber = new BaeldungBatchSubscriberImpl<String>();
|
||||||
|
publisher.subscribe(subscriber);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public Stopwatch stopwatch = new Stopwatch() {
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReactiveStreamCount() {
|
||||||
|
IntStream.range(0, ITEM_SIZE)
|
||||||
|
.forEach(item -> publisher.submit(item + ""));
|
||||||
|
publisher.close();
|
||||||
|
|
||||||
|
do {
|
||||||
|
// wait for subscribers to complete all processing.
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
} while (!subscriber.isCompleted());
|
||||||
|
|
||||||
|
int count = subscriber.getCounter();
|
||||||
|
|
||||||
|
assertEquals(ITEM_SIZE, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReactiveStreamTime() {
|
||||||
|
IntStream.range(0, ITEM_SIZE)
|
||||||
|
.forEach(item -> publisher.submit(item + ""));
|
||||||
|
publisher.close();
|
||||||
|
|
||||||
|
do {
|
||||||
|
// wait for subscribers to complete all processing.
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
} while (!subscriber.isCompleted());
|
||||||
|
|
||||||
|
// The runtime in seconds should be equal to the number of items in each batch.
|
||||||
|
assertTrue(stopwatch.runtime(TimeUnit.SECONDS) >= (ITEM_SIZE / subscriber.BUFFER_SIZE));
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
|
@ -0,0 +1,100 @@
|
||||||
|
package com.baeldung.java9.reactive;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
import java.util.concurrent.ForkJoinPool;
|
||||||
|
import java.util.concurrent.SubmissionPublisher;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Rule;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.Stopwatch;
|
||||||
|
|
||||||
|
public class BaeldungSubscriberImplTest {
|
||||||
|
|
||||||
|
private static final int ITEM_SIZE = 10;
|
||||||
|
private SubmissionPublisher<String> publisher;
|
||||||
|
private BaeldungSubscriberImpl<String> subscriber;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void initialize() {
|
||||||
|
// create Publisher with max buffer capacity 3.
|
||||||
|
this.publisher = new SubmissionPublisher<String>(ForkJoinPool.commonPool(), 3);
|
||||||
|
this.subscriber = new BaeldungSubscriberImpl<String>();
|
||||||
|
publisher.subscribe(subscriber);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public Stopwatch stopwatch = new Stopwatch() {
|
||||||
|
|
||||||
|
};
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReactiveStreamCount() {
|
||||||
|
IntStream.range(0, ITEM_SIZE)
|
||||||
|
.forEach(item -> publisher.submit(item + ""));
|
||||||
|
publisher.close();
|
||||||
|
|
||||||
|
do {
|
||||||
|
// wait for subscribers to complete all processing.
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
} while (!subscriber.isCompleted());
|
||||||
|
|
||||||
|
int count = subscriber.getCounter();
|
||||||
|
|
||||||
|
assertEquals(ITEM_SIZE, count);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReactiveStreamTime() {
|
||||||
|
IntStream.range(0, ITEM_SIZE)
|
||||||
|
.forEach(item -> publisher.submit(item + ""));
|
||||||
|
publisher.close();
|
||||||
|
|
||||||
|
do {
|
||||||
|
// wait for subscribers to complete all processing.
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
} while (!subscriber.isCompleted());
|
||||||
|
|
||||||
|
// The runtime in seconds should be equal to the number of items.
|
||||||
|
assertTrue(stopwatch.runtime(TimeUnit.SECONDS) >= ITEM_SIZE);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testReactiveStreamOffer() {
|
||||||
|
IntStream.range(0, ITEM_SIZE)
|
||||||
|
.forEach(item -> publisher.offer(item + "", (subscriber, string) -> {
|
||||||
|
// Returning false means this item will be dropped (no retry), if blocked.
|
||||||
|
return false;
|
||||||
|
}));
|
||||||
|
publisher.close();
|
||||||
|
|
||||||
|
do {
|
||||||
|
// wait for subscribers to complete all processing.
|
||||||
|
try {
|
||||||
|
Thread.sleep(100);
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
// TODO Auto-generated catch block
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
} while (!subscriber.isCompleted());
|
||||||
|
|
||||||
|
int count = subscriber.getCounter();
|
||||||
|
// Because 10 items were offered and the buffer capacity was 3, few items will not be processed.
|
||||||
|
assertTrue(ITEM_SIZE > count);
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue