Incorporated the review comments on the pull request.

This commit is contained in:
CHANDRAKANT Kumar 2020-08-16 01:15:24 +05:30
parent 4daecbbc89
commit 3e2ce28afb
4 changed files with 24 additions and 46 deletions

View File

@ -1,25 +0,0 @@
/target/
!.mvn/wrapper/maven-wrapper.jar
### STS ###
.apt_generated
.classpath
.factorypath
.project
.settings
.springBeans
.sts4-cache
### IntelliJ IDEA ###
.idea
*.iws
*.iml
*.ipr
### NetBeans ###
/nbproject/private/
/build/
/nbbuild/
/dist/
/nbdist/
/.nb-gradle/

View File

@ -1,7 +1,7 @@
## Spring WebFlux Concurrency ## Spring WebFlux Concurrency
This module contains articles about concurrency model in Spring WebFlux. This module contains articles about concurrency model in Spring WebFlux.
Please note that this assumes Mongo and Kafka to be running on the local machine on default configurations.
### Relevant Articles: ### Relevant Articles:

View File

@ -3,6 +3,9 @@ package com.baeldung.webflux;
import org.springframework.boot.SpringApplication; import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* Please note that this assumes Mongo and Kafka to be running on the local machine on default configurations.
*/
@SpringBootApplication @SpringBootApplication
public class Application { public class Application {

View File

@ -53,31 +53,31 @@ public class Controller {
@GetMapping("/threads/webclient") @GetMapping("/threads/webclient")
public Flux<String> getThreadsWebClient() { public Flux<String> getThreadsWebClient() {
WebClient.create("http://localhost:8080/index") WebClient.create("http://localhost:8080/index")
.get() .get()
.retrieve() .retrieve()
.bodyToMono(String.class) .bodyToMono(String.class)
.subscribeOn(scheduler) .subscribeOn(scheduler)
.publishOn(scheduler) .publishOn(scheduler)
.doOnNext(s -> logger.info("Response: {}", s)) .doOnNext(s -> logger.info("Response: {}", s))
.subscribe(); .subscribe();
return Flux.fromIterable(getThreads()); return Flux.fromIterable(getThreads());
} }
@GetMapping("/threads/rxjava") @GetMapping("/threads/rxjava")
public Observable<String> getIndexRxJava() { public Observable<String> getIndexRxJava() {
Observable.fromIterable(Arrays.asList("Hello", "World")) Observable.fromIterable(Arrays.asList("Hello", "World"))
.map(s -> s.toUpperCase()) .map(s -> s.toUpperCase())
.observeOn(io.reactivex.schedulers.Schedulers.trampoline()) .observeOn(io.reactivex.schedulers.Schedulers.trampoline())
.doOnNext(s -> logger.info("String: {}", s)) .doOnNext(s -> logger.info("String: {}", s))
.subscribe(); .subscribe();
return Observable.fromIterable(getThreads()); return Observable.fromIterable(getThreads());
} }
@GetMapping("/threads/mongodb") @GetMapping("/threads/mongodb")
public Flux<String> getIndexMongo() { public Flux<String> getIndexMongo() {
personRepository.findAll() personRepository.findAll()
.doOnNext(p -> logger.info("Person: {}", p)) .doOnNext(p -> logger.info("Person: {}", p))
.subscribe(); .subscribe();
return Flux.fromIterable(getThreads()); return Flux.fromIterable(getThreads());
} }
@ -90,9 +90,9 @@ public class Controller {
SenderOptions<Integer, String> senderOptions = SenderOptions.create(producerProps); SenderOptions<Integer, String> senderOptions = SenderOptions.create(producerProps);
KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions); KafkaSender<Integer, String> sender = KafkaSender.create(senderOptions);
Flux<SenderRecord<Integer, String, Integer>> outboundFlux = Flux.range(1, 10) Flux<SenderRecord<Integer, String, Integer>> outboundFlux = Flux.range(1, 10)
.map(i -> SenderRecord.create(new ProducerRecord<>("reactive-test", i, "Message_" + i), i)); .map(i -> SenderRecord.create(new ProducerRecord<>("reactive-test", i, "Message_" + i), i));
sender.send(outboundFlux) sender.send(outboundFlux)
.subscribe(); .subscribe();
Map<String, Object> consumerProps = new HashMap<>(); Map<String, Object> consumerProps = new HashMap<>();
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
@ -108,7 +108,7 @@ public class Controller {
inboundFlux.subscribe(r -> { inboundFlux.subscribe(r -> {
logger.info("Received message: {}", r.value()); logger.info("Received message: {}", r.value());
r.receiverOffset() r.receiverOffset()
.acknowledge(); .acknowledge();
}); });
return Flux.fromIterable(getThreads()); return Flux.fromIterable(getThreads());
} }
@ -120,9 +120,9 @@ public class Controller {
private List<String> getThreads() { private List<String> getThreads() {
return Thread.getAllStackTraces() return Thread.getAllStackTraces()
.keySet() .keySet()
.stream() .stream()
.map(t -> String.format("%-20s \t %s \t %d \t %s\n", t.getName(), t.getState(), t.getPriority(), t.isDaemon() ? "Daemon" : "Normal")) .map(t -> String.format("%-20s \t %s \t %d \t %s\n", t.getName(), t.getState(), t.getPriority(), t.isDaemon() ? "Daemon" : "Normal"))
.collect(Collectors.toList()); .collect(Collectors.toList());
} }
} }