Programatical sequences in project reactor (#5482)

* Added programatically creating sequences in project reactor

* Added programatically creating sequences in project reactor
This commit is contained in:
mstefanec 2018-10-18 17:47:27 +02:00 committed by maibin
parent 8c3598b441
commit ef3614a5a0
4 changed files with 160 additions and 0 deletions

View File

@ -0,0 +1,44 @@
package com.baeldung.reactor;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
public class ItemProducerCreate {
Logger logger = LoggerFactory.getLogger(NetworTrafficProducerPush.class);
Consumer<List<String>> listener;
public void create() {
Flux<String> articlesFlux = Flux.create((sink) -> {
ItemProducerCreate.this.listener = (items) -> {
items.stream()
.forEach(article -> sink.next(article));
};
});
articlesFlux.subscribe(ItemProducerCreate.this.logger::info);
}
public static void main(String[] args) {
ItemProducerCreate producer = new ItemProducerCreate();
producer.create();
new Thread(new Runnable() {
@Override
public void run() {
List<String> items = new ArrayList<>();
items.add("Item 1");
items.add("Item 2");
items.add("Item 3");
producer.listener.accept(items);
}
}).start();
}
}

View File

@ -0,0 +1,34 @@
package com.baeldung.reactor;
import java.util.function.Consumer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
import reactor.core.publisher.FluxSink.OverflowStrategy;
public class NetworTrafficProducerPush {
Logger logger = LoggerFactory.getLogger(NetworTrafficProducerPush.class);
Consumer<String> listener;
public void subscribe(Consumer<String> consumer) {
Flux<String> flux = Flux.push(sink -> {
NetworTrafficProducerPush.this.listener = (t) -> sink.next(t);
}, OverflowStrategy.DROP);
flux.subscribe(consumer);
}
public void onPacket(String packet) {
listener.accept(packet);
}
public static void main(String[] args) {
NetworTrafficProducerPush trafficProducer = new NetworTrafficProducerPush();
trafficProducer.subscribe(trafficProducer.logger::info);
trafficProducer.onPacket("Packet[A18]");
}
}

View File

@ -0,0 +1,58 @@
package com.baeldung.reactor;
import java.util.concurrent.atomic.AtomicInteger;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
public class ProgramaticSequences {
Logger logger = LoggerFactory.getLogger(ProgramaticSequences.class);
public void statefullImutableGenerate() {
Flux<String> flux = Flux.generate(() -> 1, (state, sink) -> {
sink.next("2 + " + state + " = " + 2 + state);
if (state == 101)
sink.complete();
return state + 1;
});
flux.subscribe(logger::info);
}
public void statefullMutableGenerate() {
Flux<String> flux = Flux.generate(AtomicInteger::new, (state, sink) -> {
int i = state.getAndIncrement();
sink.next("2 + " + state + " = " + 2 + state);
if (i == 101)
sink.complete();
return state;
});
flux.subscribe(logger::info);
}
public void handle() {
Flux<String> elephants = Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.handle((i, sink) -> {
String animal = "Elephant nr " + i;
if (i % 2 == 0) {
sink.next(animal);
}
});
elephants.subscribe(logger::info);
}
public static void main(String[] args) {
ProgramaticSequences ps = new ProgramaticSequences();
ps.statefullImutableGenerate();
ps.statefullMutableGenerate();
ps.handle();
}
}

View File

@ -0,0 +1,24 @@
package com.baeldung.reactor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.core.publisher.Flux;
public class StatelessGenerate {
Logger logger = LoggerFactory.getLogger(StatelessGenerate.class);
public void statelessGenerate() {
Flux<String> flux = Flux.generate((sink) -> {
sink.next("hallo");
});
flux.subscribe(logger::info);
}
public static void main(String[] args) {
StatelessGenerate ps = new StatelessGenerate();
ps.statelessGenerate();
}
}