diff --git a/core-java-9/src/main/java/com/baeldung/java9/reactive/BaeldungBatchSubscriberImpl.java b/core-java-9/src/main/java/com/baeldung/java9/reactive/BaeldungBatchSubscriberImpl.java new file mode 100644 index 0000000000..46eee4883a --- /dev/null +++ b/core-java-9/src/main/java/com/baeldung/java9/reactive/BaeldungBatchSubscriberImpl.java @@ -0,0 +1,82 @@ +package com.baeldung.java9.reactive; + +import java.util.ArrayList; +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; + +public class BaeldungBatchSubscriberImpl implements Subscriber { + private Subscription subscription; + private boolean completed = false; + private int counter; + private ArrayList buffer; + public static final int BUFFER_SIZE = 5; + + public BaeldungBatchSubscriberImpl() { + buffer = new ArrayList(); + } + + public boolean isCompleted() { + return completed; + } + + public void setCompleted(boolean completed) { + this.completed = completed; + } + + public int getCounter() { + return counter; + } + + public void setCounter(int counter) { + this.counter = counter; + } + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + subscription.request(BUFFER_SIZE); + } + + @Override + public void onNext(String item) { + buffer.add(item); + // if buffer is full, process the items. + if (buffer.size() >= BUFFER_SIZE) { + processBuffer(); + } + //request more items. + subscription.request(1); + } + + private void processBuffer() { + if (buffer.isEmpty()) + return; + // Process all items in the buffer. Here, we just print it and sleep for 1 second. + System.out.print("Processed items: "); + buffer.stream() + .forEach(item -> { + System.out.print(" " + item); + }); + System.out.println(); + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + counter = counter + buffer.size(); + buffer.clear(); + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + + @Override + public void onComplete() { + completed = true; + // process any remaining items in buffer before + processBuffer(); + subscription.cancel(); + } +} diff --git a/core-java-9/src/main/java/com/baeldung/java9/reactive/BaeldungSubscriberImpl.java b/core-java-9/src/main/java/com/baeldung/java9/reactive/BaeldungSubscriberImpl.java new file mode 100644 index 0000000000..bacd777255 --- /dev/null +++ b/core-java-9/src/main/java/com/baeldung/java9/reactive/BaeldungSubscriberImpl.java @@ -0,0 +1,55 @@ +package com.baeldung.java9.reactive; + +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Subscription; + +public class BaeldungSubscriberImpl implements Subscriber { + private Subscription subscription; + private boolean completed = false; + private int counter; + + public boolean isCompleted() { + return completed; + } + + public void setCompleted(boolean completed) { + this.completed = completed; + } + + public int getCounter() { + return counter; + } + + public void setCounter(int counter) { + this.counter = counter; + } + + @Override + public void onSubscribe(Subscription subscription) { + this.subscription = subscription; + subscription.request(1); + } + + @Override + public void onNext(String item) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + counter++; + System.out.println("Processed item : " + item); + subscription.request(1); + } + + @Override + public void onError(Throwable t) { + t.printStackTrace(); + } + + @Override + public void onComplete() { + completed = true; + subscription.cancel(); + } +} diff --git a/core-java-9/src/test/java/com/baeldung/java9/reactive/BaeldungBatchSubscriberImplTest.java b/core-java-9/src/test/java/com/baeldung/java9/reactive/BaeldungBatchSubscriberImplTest.java new file mode 100644 index 0000000000..388d3efdd8 --- /dev/null +++ b/core-java-9/src/test/java/com/baeldung/java9/reactive/BaeldungBatchSubscriberImplTest.java @@ -0,0 +1,75 @@ +package com.baeldung.java9.reactive; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.SubmissionPublisher; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Stopwatch; + +public class BaeldungBatchSubscriberImplTest { + + private static final int ITEM_SIZE = 10; + private SubmissionPublisher publisher; + private BaeldungBatchSubscriberImpl subscriber; + + @Before + public void initialize() { + this.publisher = new SubmissionPublisher(ForkJoinPool.commonPool(), 6); + this.subscriber = new BaeldungBatchSubscriberImpl(); + publisher.subscribe(subscriber); + } + + @Rule + public Stopwatch stopwatch = new Stopwatch() { + + }; + + @Test + public void testReactiveStreamCount() { + IntStream.range(0, ITEM_SIZE) + .forEach(item -> publisher.submit(item + "")); + publisher.close(); + + do { + // wait for subscribers to complete all processing. + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } while (!subscriber.isCompleted()); + + int count = subscriber.getCounter(); + + assertEquals(ITEM_SIZE, count); + } + + @Test + public void testReactiveStreamTime() { + IntStream.range(0, ITEM_SIZE) + .forEach(item -> publisher.submit(item + "")); + publisher.close(); + + do { + // wait for subscribers to complete all processing. + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } while (!subscriber.isCompleted()); + + // The runtime in seconds should be equal to the number of items in each batch. + assertTrue(stopwatch.runtime(TimeUnit.SECONDS) >= (ITEM_SIZE / subscriber.BUFFER_SIZE)); + } + +} diff --git a/core-java-9/src/test/java/com/baeldung/java9/reactive/BaeldungSubscriberImplTest.java b/core-java-9/src/test/java/com/baeldung/java9/reactive/BaeldungSubscriberImplTest.java new file mode 100644 index 0000000000..5638c0a431 --- /dev/null +++ b/core-java-9/src/test/java/com/baeldung/java9/reactive/BaeldungSubscriberImplTest.java @@ -0,0 +1,100 @@ +package com.baeldung.java9.reactive; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.ForkJoinPool; +import java.util.concurrent.SubmissionPublisher; +import java.util.concurrent.TimeUnit; +import java.util.stream.IntStream; + +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Stopwatch; + +public class BaeldungSubscriberImplTest { + + private static final int ITEM_SIZE = 10; + private SubmissionPublisher publisher; + private BaeldungSubscriberImpl subscriber; + + @Before + public void initialize() { + // create Publisher with max buffer capacity 3. + this.publisher = new SubmissionPublisher(ForkJoinPool.commonPool(), 3); + this.subscriber = new BaeldungSubscriberImpl(); + publisher.subscribe(subscriber); + } + + @Rule + public Stopwatch stopwatch = new Stopwatch() { + + }; + + @Test + public void testReactiveStreamCount() { + IntStream.range(0, ITEM_SIZE) + .forEach(item -> publisher.submit(item + "")); + publisher.close(); + + do { + // wait for subscribers to complete all processing. + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } while (!subscriber.isCompleted()); + + int count = subscriber.getCounter(); + + assertEquals(ITEM_SIZE, count); + } + + @Test + public void testReactiveStreamTime() { + IntStream.range(0, ITEM_SIZE) + .forEach(item -> publisher.submit(item + "")); + publisher.close(); + + do { + // wait for subscribers to complete all processing. + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } while (!subscriber.isCompleted()); + + // The runtime in seconds should be equal to the number of items. + assertTrue(stopwatch.runtime(TimeUnit.SECONDS) >= ITEM_SIZE); + } + + @Test + public void testReactiveStreamOffer() { + IntStream.range(0, ITEM_SIZE) + .forEach(item -> publisher.offer(item + "", (subscriber, string) -> { + // Returning false means this item will be dropped (no retry), if blocked. + return false; + })); + publisher.close(); + + do { + // wait for subscribers to complete all processing. + try { + Thread.sleep(100); + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } while (!subscriber.isCompleted()); + + int count = subscriber.getCounter(); + // Because 10 items were offered and the buffer capacity was 3, few items will not be processed. + assertTrue(ITEM_SIZE > count); + } + +}