From a8b29dd1407b773bb7b447a11d8359767e5583e8 Mon Sep 17 00:00:00 2001 From: mstefanec <42640465+mstefanec@users.noreply.github.com> Date: Sun, 28 Oct 2018 06:49:40 +0100 Subject: [PATCH] Added UnitTest for programatically creating sequences in project reactor (#5552) --- .../baeldung/reactor/ItemProducerCreate.java | 32 ++-------- .../reactor/NetworTrafficProducerPush.java | 34 ----------- .../reactor/NetworkTrafficProducerPush.java | 23 ++++++++ .../reactor/ProgramaticSequences.java | 58 ------------------- .../reactor/ProgrammaticSequences.java | 38 ++++++++++++ .../baeldung/reactor/StatelessGenerate.java | 18 +----- .../reactor/ItemProducerCreateUnitTest.java | 33 +++++++++++ .../NetworkTrafficProducerPushUnitTest.java | 23 ++++++++ .../ProgrammaticSequencesUnitTest.java | 42 ++++++++++++++ 9 files changed, 167 insertions(+), 134 deletions(-) delete mode 100644 reactor-core/src/main/java/com/baeldung/reactor/NetworTrafficProducerPush.java create mode 100644 reactor-core/src/main/java/com/baeldung/reactor/NetworkTrafficProducerPush.java delete mode 100644 reactor-core/src/main/java/com/baeldung/reactor/ProgramaticSequences.java create mode 100644 reactor-core/src/main/java/com/baeldung/reactor/ProgrammaticSequences.java create mode 100644 reactor-core/src/test/java/com/baeldung/reactor/ItemProducerCreateUnitTest.java create mode 100644 reactor-core/src/test/java/com/baeldung/reactor/NetworkTrafficProducerPushUnitTest.java create mode 100644 reactor-core/src/test/java/com/baeldung/reactor/ProgrammaticSequencesUnitTest.java diff --git a/reactor-core/src/main/java/com/baeldung/reactor/ItemProducerCreate.java b/reactor-core/src/main/java/com/baeldung/reactor/ItemProducerCreate.java index 6078f8ef0b..e475303a3d 100644 --- a/reactor-core/src/main/java/com/baeldung/reactor/ItemProducerCreate.java +++ b/reactor-core/src/main/java/com/baeldung/reactor/ItemProducerCreate.java @@ -1,44 +1,22 @@ package com.baeldung.reactor; -import java.util.ArrayList; -import java.util.List; -import java.util.function.Consumer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import reactor.core.publisher.Flux; -public class ItemProducerCreate { +import java.util.List; +import java.util.function.Consumer; - Logger logger = LoggerFactory.getLogger(NetworTrafficProducerPush.class); +public class ItemProducerCreate { Consumer> listener; - public void create() { + public Flux create() { Flux articlesFlux = Flux.create((sink) -> { ItemProducerCreate.this.listener = (items) -> { items.stream() .forEach(article -> sink.next(article)); }; }); - articlesFlux.subscribe(ItemProducerCreate.this.logger::info); + return articlesFlux; } - public static void main(String[] args) { - ItemProducerCreate producer = new ItemProducerCreate(); - producer.create(); - - new Thread(new Runnable() { - - @Override - public void run() { - List items = new ArrayList<>(); - items.add("Item 1"); - items.add("Item 2"); - items.add("Item 3"); - producer.listener.accept(items); - } - }).start(); - } } diff --git a/reactor-core/src/main/java/com/baeldung/reactor/NetworTrafficProducerPush.java b/reactor-core/src/main/java/com/baeldung/reactor/NetworTrafficProducerPush.java deleted file mode 100644 index 807ceae84d..0000000000 --- a/reactor-core/src/main/java/com/baeldung/reactor/NetworTrafficProducerPush.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.baeldung.reactor; - -import java.util.function.Consumer; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import reactor.core.publisher.Flux; -import reactor.core.publisher.FluxSink.OverflowStrategy; - -public class NetworTrafficProducerPush { - - Logger logger = LoggerFactory.getLogger(NetworTrafficProducerPush.class); - - Consumer listener; - - public void subscribe(Consumer consumer) { - Flux flux = Flux.push(sink -> { - NetworTrafficProducerPush.this.listener = (t) -> sink.next(t); - }, OverflowStrategy.DROP); - flux.subscribe(consumer); - } - - public void onPacket(String packet) { - listener.accept(packet); - } - - public static void main(String[] args) { - NetworTrafficProducerPush trafficProducer = new NetworTrafficProducerPush(); - trafficProducer.subscribe(trafficProducer.logger::info); - trafficProducer.onPacket("Packet[A18]"); - } - -} diff --git a/reactor-core/src/main/java/com/baeldung/reactor/NetworkTrafficProducerPush.java b/reactor-core/src/main/java/com/baeldung/reactor/NetworkTrafficProducerPush.java new file mode 100644 index 0000000000..a81d41ac11 --- /dev/null +++ b/reactor-core/src/main/java/com/baeldung/reactor/NetworkTrafficProducerPush.java @@ -0,0 +1,23 @@ +package com.baeldung.reactor; + +import java.util.function.Consumer; + +import reactor.core.publisher.Flux; +import reactor.core.publisher.FluxSink.OverflowStrategy; + +public class NetworkTrafficProducerPush { + + Consumer listener; + + public void subscribe(Consumer consumer) { + Flux flux = Flux.push(sink -> { + NetworkTrafficProducerPush.this.listener = (t) -> sink.next(t); + }, OverflowStrategy.DROP); + flux.subscribe(consumer); + } + + public void onPacket(String packet) { + listener.accept(packet); + } + +} diff --git a/reactor-core/src/main/java/com/baeldung/reactor/ProgramaticSequences.java b/reactor-core/src/main/java/com/baeldung/reactor/ProgramaticSequences.java deleted file mode 100644 index b52def377d..0000000000 --- a/reactor-core/src/main/java/com/baeldung/reactor/ProgramaticSequences.java +++ /dev/null @@ -1,58 +0,0 @@ -package com.baeldung.reactor; - -import java.util.concurrent.atomic.AtomicInteger; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import reactor.core.publisher.Flux; - -public class ProgramaticSequences { - - Logger logger = LoggerFactory.getLogger(ProgramaticSequences.class); - - public void statefullImutableGenerate() { - Flux flux = Flux.generate(() -> 1, (state, sink) -> { - sink.next("2 + " + state + " = " + 2 + state); - if (state == 101) - sink.complete(); - return state + 1; - }); - - flux.subscribe(logger::info); - } - - public void statefullMutableGenerate() { - Flux flux = Flux.generate(AtomicInteger::new, (state, sink) -> { - int i = state.getAndIncrement(); - sink.next("2 + " + state + " = " + 2 + state); - if (i == 101) - sink.complete(); - return state; - }); - - flux.subscribe(logger::info); - } - - public void handle() { - Flux elephants = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) - .handle((i, sink) -> { - String animal = "Elephant nr " + i; - if (i % 2 == 0) { - sink.next(animal); - } - }); - - elephants.subscribe(logger::info); - } - - public static void main(String[] args) { - ProgramaticSequences ps = new ProgramaticSequences(); - - ps.statefullImutableGenerate(); - ps.statefullMutableGenerate(); - ps.handle(); - - } - -} diff --git a/reactor-core/src/main/java/com/baeldung/reactor/ProgrammaticSequences.java b/reactor-core/src/main/java/com/baeldung/reactor/ProgrammaticSequences.java new file mode 100644 index 0000000000..5c11240753 --- /dev/null +++ b/reactor-core/src/main/java/com/baeldung/reactor/ProgrammaticSequences.java @@ -0,0 +1,38 @@ +package com.baeldung.reactor; + +import java.util.concurrent.atomic.AtomicInteger; + +import reactor.core.publisher.Flux; + +public class ProgrammaticSequences { + + public Flux statefulImutableGenerate() { + return Flux.generate(() -> 1, (state, sink) -> { + sink.next("2 + " + state + " = " + (2 + state)); + if (state == 101) + sink.complete(); + return state + 1; + }); + } + + public Flux statefulMutableGenerate() { + return Flux.generate(AtomicInteger::new, (state, sink) -> { + int i = state.getAndIncrement(); + sink.next("2 + " + i + " = " + (2 + i)); + if (i == 101) + sink.complete(); + return state; + }); + } + + public Flux handle() { + return Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10) + .handle((i, sink) -> { + String animal = "Elephant nr " + i; + if (i % 2 == 0) { + sink.next(animal); + } + }); + } + +} diff --git a/reactor-core/src/main/java/com/baeldung/reactor/StatelessGenerate.java b/reactor-core/src/main/java/com/baeldung/reactor/StatelessGenerate.java index c82f8e160b..3b9f0bb522 100644 --- a/reactor-core/src/main/java/com/baeldung/reactor/StatelessGenerate.java +++ b/reactor-core/src/main/java/com/baeldung/reactor/StatelessGenerate.java @@ -1,24 +1,12 @@ package com.baeldung.reactor; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import reactor.core.publisher.Flux; public class StatelessGenerate { - Logger logger = LoggerFactory.getLogger(StatelessGenerate.class); - - public void statelessGenerate() { - Flux flux = Flux.generate((sink) -> { - sink.next("hallo"); + public Flux statelessGenerate() { + return Flux.generate((sink) -> { + sink.next("hello"); }); - flux.subscribe(logger::info); } - - public static void main(String[] args) { - StatelessGenerate ps = new StatelessGenerate(); - ps.statelessGenerate(); - } - } diff --git a/reactor-core/src/test/java/com/baeldung/reactor/ItemProducerCreateUnitTest.java b/reactor-core/src/test/java/com/baeldung/reactor/ItemProducerCreateUnitTest.java new file mode 100644 index 0000000000..8c436306d1 --- /dev/null +++ b/reactor-core/src/test/java/com/baeldung/reactor/ItemProducerCreateUnitTest.java @@ -0,0 +1,33 @@ +package com.baeldung.reactor; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ItemProducerCreateUnitTest { + + @Test + public void givenFluxWithAsynchronousCreate_whenProduceItemsFromDifferentThread_thenAllItemsAreCollectedByTheSubscriber() throws InterruptedException { + List elements = new ArrayList<>(); + ItemProducerCreate producer = new ItemProducerCreate(); + producer.create() + .subscribe(elements::add); + + Thread producerThread = new Thread(() -> { + List items = new ArrayList<>(); + items.add("Item 1"); + items.add("Item 2"); + items.add("Item 3"); + producer.listener.accept(items); + }); + + producerThread.start(); + producerThread.join(); + + assertThat(elements).containsExactly("Item 1", "Item 2", "Item 3"); + } + +} diff --git a/reactor-core/src/test/java/com/baeldung/reactor/NetworkTrafficProducerPushUnitTest.java b/reactor-core/src/test/java/com/baeldung/reactor/NetworkTrafficProducerPushUnitTest.java new file mode 100644 index 0000000000..168ab1f297 --- /dev/null +++ b/reactor-core/src/test/java/com/baeldung/reactor/NetworkTrafficProducerPushUnitTest.java @@ -0,0 +1,23 @@ +package com.baeldung.reactor; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class NetworkTrafficProducerPushUnitTest { + + @Test + public void givenFluxWithAsynchronousPushWithListener_whenListenerIsInvoked_thenItemCollectedByTheSubscriber() throws InterruptedException { + List elements = new ArrayList<>(); + + NetworkTrafficProducerPush trafficProducer = new NetworkTrafficProducerPush(); + trafficProducer.subscribe(elements::add); + trafficProducer.onPacket("Packet[A18]"); + + assertThat(elements).containsExactly("Packet[A18]"); + } + +} diff --git a/reactor-core/src/test/java/com/baeldung/reactor/ProgrammaticSequencesUnitTest.java b/reactor-core/src/test/java/com/baeldung/reactor/ProgrammaticSequencesUnitTest.java new file mode 100644 index 0000000000..996ca9e20c --- /dev/null +++ b/reactor-core/src/test/java/com/baeldung/reactor/ProgrammaticSequencesUnitTest.java @@ -0,0 +1,42 @@ +package com.baeldung.reactor; + +import org.junit.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ProgrammaticSequencesUnitTest { + + @Test + public void givenFluxWithStatefulImmutableGenerate_whenSubscribeAddItemsToCollect_thenAllItemsAreCollectedByTheSubscriber() throws InterruptedException { + List elements = new ArrayList<>(); + ProgrammaticSequences producer = new ProgrammaticSequences(); + producer.statefulImutableGenerate() + .subscribe(elements::add); + assertThat(elements).hasSize(101); + assertThat(elements).contains("2 + 1 = 3", "2 + 101 = 103"); + } + + @Test + public void givenFluxWithStatefulMutableGenerate_whenSubscribeAddItemsToCollect_thenAllItemsAreCollectedByTheSubscriber() throws InterruptedException { + List elements = new ArrayList<>(); + ProgrammaticSequences producer = new ProgrammaticSequences(); + producer.statefulMutableGenerate() + .subscribe(elements::add); + assertThat(elements).hasSize(102); + assertThat(elements).contains("2 + 0 = 2", "2 + 101 = 103"); + } + + @Test + public void givenFluxWithHandle_whenSubscribeAddItemsToCollect_thenAllItemsAreCollectedByTheSubscriber() throws InterruptedException { + List elements = new ArrayList<>(); + ProgrammaticSequences producer = new ProgrammaticSequences(); + producer.handle() + .subscribe(elements::add); + assertThat(elements).hasSize(5); + assertThat(elements).contains("Elephant nr 2", "Elephant nr 10"); + } + +}