Added UnitTest for programatically creating sequences in project reactor (#5552)

This commit is contained in:
mstefanec 2018-10-28 06:49:40 +01:00 committed by maibin
parent 716e1ac44b
commit a8b29dd140
9 changed files with 167 additions and 134 deletions

View File

@ -1,44 +1,22 @@
package com.baeldung.reactor; package com.baeldung.reactor;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
public class ItemProducerCreate { import java.util.List;
import java.util.function.Consumer;
Logger logger = LoggerFactory.getLogger(NetworTrafficProducerPush.class); public class ItemProducerCreate {
Consumer<List<String>> listener; Consumer<List<String>> listener;
public void create() { public Flux<String> create() {
Flux<String> articlesFlux = Flux.create((sink) -> { Flux<String> articlesFlux = Flux.create((sink) -> {
ItemProducerCreate.this.listener = (items) -> { ItemProducerCreate.this.listener = (items) -> {
items.stream() items.stream()
.forEach(article -> sink.next(article)); .forEach(article -> sink.next(article));
}; };
}); });
articlesFlux.subscribe(ItemProducerCreate.this.logger::info); return articlesFlux;
} }
public static void main(String[] args) {
ItemProducerCreate producer = new ItemProducerCreate();
producer.create();
new Thread(new Runnable() {
@Override
public void run() {
List<String> items = new ArrayList<>();
items.add("Item 1");
items.add("Item 2");
items.add("Item 3");
producer.listener.accept(items);
}
}).start();
}
} }

View File

@ -1,34 +0,0 @@
package com.baeldung.reactor;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
public class NetworTrafficProducerPush {
Logger logger = LoggerFactory.getLogger(NetworTrafficProducerPush.class);
Consumer<String> listener;
public void subscribe(Consumer<String> consumer) {
Flux<String> flux = Flux.push(sink -> {
NetworTrafficProducerPush.this.listener = (t) -> sink.next(t);
}, OverflowStrategy.DROP);
flux.subscribe(consumer);
}
public void onPacket(String packet) {
listener.accept(packet);
}
public static void main(String[] args) {
NetworTrafficProducerPush trafficProducer = new NetworTrafficProducerPush();
trafficProducer.subscribe(trafficProducer.logger::info);
trafficProducer.onPacket("Packet[A18]");
}
}

View File

@ -0,0 +1,23 @@
package com.baeldung.reactor;
import java.util.function.Consumer;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
public class NetworkTrafficProducerPush {
Consumer<String> listener;
public void subscribe(Consumer<String> consumer) {
Flux<String> flux = Flux.push(sink -> {
NetworkTrafficProducerPush.this.listener = (t) -> sink.next(t);
}, OverflowStrategy.DROP);
flux.subscribe(consumer);
}
public void onPacket(String packet) {
listener.accept(packet);
}
}

View File

@ -1,58 +0,0 @@
package com.baeldung.reactor;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
public class ProgramaticSequences {
Logger logger = LoggerFactory.getLogger(ProgramaticSequences.class);
public void statefullImutableGenerate() {
Flux<String> flux = Flux.generate(() -> 1, (state, sink) -> {
sink.next("2 + " + state + " = " + 2 + state);
if (state == 101)
sink.complete();
return state + 1;
});
flux.subscribe(logger::info);
}
public void statefullMutableGenerate() {
Flux<String> flux = Flux.generate(AtomicInteger::new, (state, sink) -> {
int i = state.getAndIncrement();
sink.next("2 + " + state + " = " + 2 + state);
if (i == 101)
sink.complete();
return state;
});
flux.subscribe(logger::info);
}
public void handle() {
Flux<String> elephants = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.handle((i, sink) -> {
String animal = "Elephant nr " + i;
if (i % 2 == 0) {
sink.next(animal);
}
});
elephants.subscribe(logger::info);
}
public static void main(String[] args) {
ProgramaticSequences ps = new ProgramaticSequences();
ps.statefullImutableGenerate();
ps.statefullMutableGenerate();
ps.handle();
}
}

View File

@ -0,0 +1,38 @@
package com.baeldung.reactor;
import java.util.concurrent.atomic.AtomicInteger;
import reactor.core.publisher.Flux;
public class ProgrammaticSequences {
public Flux<String> statefulImutableGenerate() {
return Flux.generate(() -> 1, (state, sink) -> {
sink.next("2 + " + state + " = " + (2 + state));
if (state == 101)
sink.complete();
return state + 1;
});
}
public Flux<String> statefulMutableGenerate() {
return Flux.generate(AtomicInteger::new, (state, sink) -> {
int i = state.getAndIncrement();
sink.next("2 + " + i + " = " + (2 + i));
if (i == 101)
sink.complete();
return state;
});
}
public Flux<String> handle() {
return Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.handle((i, sink) -> {
String animal = "Elephant nr " + i;
if (i % 2 == 0) {
sink.next(animal);
}
});
}
}

View File

@ -1,24 +1,12 @@
package com.baeldung.reactor; package com.baeldung.reactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux; import reactor.core.publisher.Flux;
public class StatelessGenerate { public class StatelessGenerate {
Logger logger = LoggerFactory.getLogger(StatelessGenerate.class); public Flux<String> statelessGenerate() {
return Flux.generate((sink) -> {
public void statelessGenerate() { sink.next("hello");
Flux<String> flux = Flux.generate((sink) -> {
sink.next("hallo");
}); });
flux.subscribe(logger::info);
} }
public static void main(String[] args) {
StatelessGenerate ps = new StatelessGenerate();
ps.statelessGenerate();
}
} }

View File

@ -0,0 +1,33 @@
package com.baeldung.reactor;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public class ItemProducerCreateUnitTest {
@Test
public void givenFluxWithAsynchronousCreate_whenProduceItemsFromDifferentThread_thenAllItemsAreCollectedByTheSubscriber() throws InterruptedException {
List<String> elements = new ArrayList<>();
ItemProducerCreate producer = new ItemProducerCreate();
producer.create()
.subscribe(elements::add);
Thread producerThread = new Thread(() -> {
List<String> items = new ArrayList<>();
items.add("Item 1");
items.add("Item 2");
items.add("Item 3");
producer.listener.accept(items);
});
producerThread.start();
producerThread.join();
assertThat(elements).containsExactly("Item 1", "Item 2", "Item 3");
}
}

View File

@ -0,0 +1,23 @@
package com.baeldung.reactor;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public class NetworkTrafficProducerPushUnitTest {
@Test
public void givenFluxWithAsynchronousPushWithListener_whenListenerIsInvoked_thenItemCollectedByTheSubscriber() throws InterruptedException {
List<String> elements = new ArrayList<>();
NetworkTrafficProducerPush trafficProducer = new NetworkTrafficProducerPush();
trafficProducer.subscribe(elements::add);
trafficProducer.onPacket("Packet[A18]");
assertThat(elements).containsExactly("Packet[A18]");
}
}

View File

@ -0,0 +1,42 @@
package com.baeldung.reactor;
import org.junit.Test;
import java.util.ArrayList;
import java.util.List;
import static org.assertj.core.api.Assertions.assertThat;
public class ProgrammaticSequencesUnitTest {
@Test
public void givenFluxWithStatefulImmutableGenerate_whenSubscribeAddItemsToCollect_thenAllItemsAreCollectedByTheSubscriber() throws InterruptedException {
List<String> elements = new ArrayList<>();
ProgrammaticSequences producer = new ProgrammaticSequences();
producer.statefulImutableGenerate()
.subscribe(elements::add);
assertThat(elements).hasSize(101);
assertThat(elements).contains("2 + 1 = 3", "2 + 101 = 103");
}
@Test
public void givenFluxWithStatefulMutableGenerate_whenSubscribeAddItemsToCollect_thenAllItemsAreCollectedByTheSubscriber() throws InterruptedException {
List<String> elements = new ArrayList<>();
ProgrammaticSequences producer = new ProgrammaticSequences();
producer.statefulMutableGenerate()
.subscribe(elements::add);
assertThat(elements).hasSize(102);
assertThat(elements).contains("2 + 0 = 2", "2 + 101 = 103");
}
@Test
public void givenFluxWithHandle_whenSubscribeAddItemsToCollect_thenAllItemsAreCollectedByTheSubscriber() throws InterruptedException {
List<String> elements = new ArrayList<>();
ProgrammaticSequences producer = new ProgrammaticSequences();
producer.handle()
.subscribe(elements::add);
assertThat(elements).hasSize(5);
assertThat(elements).contains("Elephant nr 2", "Elephant nr 10");
}
}