diff --git a/pom.xml b/pom.xml index a69ffa2798..3565c2dc4b 100644 --- a/pom.xml +++ b/pom.xml @@ -556,6 +556,7 @@ atomikos reactive-systems slack + spring-webflux-threads @@ -1067,6 +1068,7 @@ atomikos reactive-systems slack + spring-webflux-threads diff --git a/spring-webflux-threads/.gitignore b/spring-webflux-threads/.gitignore new file mode 100644 index 0000000000..82eca336e3 --- /dev/null +++ b/spring-webflux-threads/.gitignore @@ -0,0 +1,25 @@ +/target/ +!.mvn/wrapper/maven-wrapper.jar + +### STS ### +.apt_generated +.classpath +.factorypath +.project +.settings +.springBeans +.sts4-cache + +### IntelliJ IDEA ### +.idea +*.iws +*.iml +*.ipr + +### NetBeans ### +/nbproject/private/ +/build/ +/nbbuild/ +/dist/ +/nbdist/ +/.nb-gradle/ \ No newline at end of file diff --git a/spring-webflux-threads/README.md b/spring-webflux-threads/README.md new file mode 100644 index 0000000000..ab64d897cc --- /dev/null +++ b/spring-webflux-threads/README.md @@ -0,0 +1,7 @@ +## Spring WebFlux Concurrency + +This module contains articles about consurrency model in Spring WebFlux + +### Relevant Articles: + +- [Concurrency in Spring WebFlux]() diff --git a/spring-webflux-threads/pom.xml b/spring-webflux-threads/pom.xml new file mode 100644 index 0000000000..e5b5bafd3b --- /dev/null +++ b/spring-webflux-threads/pom.xml @@ -0,0 +1,87 @@ + + + 4.0.0 + com.baeldung.spring + spring-webflux-threads + 1.0.0-SNAPSHOT + spring-webflux-threads + jar + Spring WebFlux AMQP Sample + + + com.baeldung + parent-boot-2 + 0.0.1-SNAPSHOT + ../parent-boot-2 + + + + + org.springframework.boot + spring-boot-starter-webflux + + + + + + io.reactivex.rxjava2 + rxjava + 2.2.19 + + + org.springframework.boot + spring-boot-starter-data-mongodb-reactive + + + io.projectreactor.kafka + reactor-kafka + 1.2.2.RELEASE + + + com.fasterxml.jackson.core + jackson-databind + + + org.springframework.boot + spring-boot-starter-test + test + + + io.projectreactor + reactor-test + test + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + + diff --git a/spring-webflux-threads/src/main/java/com/baeldung/webflux/Application.java b/spring-webflux-threads/src/main/java/com/baeldung/webflux/Application.java new file mode 100644 index 0000000000..1dfa00eae0 --- /dev/null +++ b/spring-webflux-threads/src/main/java/com/baeldung/webflux/Application.java @@ -0,0 +1,13 @@ +package com.baeldung.webflux; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + +} diff --git a/spring-webflux-threads/src/main/java/com/baeldung/webflux/Controller.java b/spring-webflux-threads/src/main/java/com/baeldung/webflux/Controller.java new file mode 100644 index 0000000000..7036deb998 --- /dev/null +++ b/spring-webflux-threads/src/main/java/com/baeldung/webflux/Controller.java @@ -0,0 +1,128 @@ +package com.baeldung.webflux; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.reactive.function.client.WebClient; + +import io.reactivex.Observable; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.kafka.receiver.KafkaReceiver; +import reactor.kafka.receiver.ReceiverOptions; +import reactor.kafka.receiver.ReceiverRecord; +import reactor.kafka.sender.KafkaSender; +import reactor.kafka.sender.SenderOptions; +import reactor.kafka.sender.SenderRecord; + +@RestController +@RequestMapping("/") +public class Controller { + + @Autowired + private PersonRepository personRepository; + + private Scheduler scheduler = Schedulers.newBoundedElastic(5, 10, "MyThreadGroup"); + + private Logger logger = LoggerFactory.getLogger(Controller.class); + + @GetMapping("/threads/webflux") + public Flux getThreadsWebflux() { + return Flux.fromIterable(getThreads()); + } + + @GetMapping("/threads/webclient") + public Flux getThreadsWebClient() { + WebClient.create("http://localhost:8080/index") + .get() + .retrieve() + .bodyToMono(String.class) + .subscribeOn(scheduler) + .publishOn(scheduler) + .doOnNext(s -> logger.info("Response: {}", s)) + .subscribe(); + return Flux.fromIterable(getThreads()); + } + + @GetMapping("/threads/rxjava") + public Observable getIndexRxJava() { + Observable.fromIterable(Arrays.asList("Hello", "World")) + .map(s -> s.toUpperCase()) + .observeOn(io.reactivex.schedulers.Schedulers.trampoline()) + .doOnNext(s -> logger.info("String: {}", s)) + .subscribe(); + return Observable.fromIterable(getThreads()); + } + + @GetMapping("/threads/mongodb") + public Flux getIndexMongo() { + personRepository.findAll() + .doOnNext(p -> logger.info("Person: {}", p)) + .subscribe(); + return Flux.fromIterable(getThreads()); + } + + @GetMapping("/thareds/reactor-kafka") + public Flux getIndexKafka() { + Map producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + SenderOptions senderOptions = SenderOptions.create(producerProps); + KafkaSender sender = KafkaSender.create(senderOptions); + Flux> outboundFlux = Flux.range(1, 10) + .map(i -> SenderRecord.create(new ProducerRecord<>("reactive-test", i, "Message_" + i), i)); + sender.send(outboundFlux) + .subscribe(); + + Map consumerProps = new HashMap<>(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "my-consumer"); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + ReceiverOptions receiverOptions = ReceiverOptions.create(consumerProps); + receiverOptions.subscription(Collections.singleton("reactive-test")); + KafkaReceiver receiver = KafkaReceiver.create(receiverOptions); + Flux> inboundFlux = receiver.receive(); + inboundFlux.subscribe(r -> { + logger.info("Received message: {}", r.value()); + r.receiverOffset() + .acknowledge(); + }); + return Flux.fromIterable(getThreads()); + } + + @GetMapping("/index") + public Mono getIndex() { + return Mono.just("Hello world!"); + } + + private List getThreads() { + return Thread.getAllStackTraces() + .keySet() + .stream() + .map(t -> String.format("%-20s \t %s \t %d \t %s\n", t.getName(), t.getState(), t.getPriority(), t.isDaemon() ? "Daemon" : "Normal")) + .collect(Collectors.toList()); + } +} diff --git a/spring-webflux-threads/src/main/java/com/baeldung/webflux/Person.java b/spring-webflux-threads/src/main/java/com/baeldung/webflux/Person.java new file mode 100644 index 0000000000..4c6bd5f585 --- /dev/null +++ b/spring-webflux-threads/src/main/java/com/baeldung/webflux/Person.java @@ -0,0 +1,27 @@ +package com.baeldung.webflux; + +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; + +@Document +public class Person { + @Id + String id; + + public Person(String id) { + this.id = id; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + @Override + public String toString() { + return "Person{" + "id='" + id + '\'' + '}'; + } +} diff --git a/spring-webflux-threads/src/main/java/com/baeldung/webflux/PersonRepository.java b/spring-webflux-threads/src/main/java/com/baeldung/webflux/PersonRepository.java new file mode 100644 index 0000000000..38fbd3d431 --- /dev/null +++ b/spring-webflux-threads/src/main/java/com/baeldung/webflux/PersonRepository.java @@ -0,0 +1,6 @@ +package com.baeldung.webflux; + +import org.springframework.data.mongodb.repository.ReactiveMongoRepository; + +public interface PersonRepository extends ReactiveMongoRepository { +} diff --git a/spring-webflux-threads/src/main/resources/application.yml b/spring-webflux-threads/src/main/resources/application.yml new file mode 100644 index 0000000000..5addcff6c2 --- /dev/null +++ b/spring-webflux-threads/src/main/resources/application.yml @@ -0,0 +1,7 @@ + + + + + + + diff --git a/spring-webflux-threads/src/main/resources/logback.xml b/spring-webflux-threads/src/main/resources/logback.xml new file mode 100644 index 0000000000..7d900d8ea8 --- /dev/null +++ b/spring-webflux-threads/src/main/resources/logback.xml @@ -0,0 +1,13 @@ + + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + \ No newline at end of file