From 0cb91257011f91f287a26f25437c94ef1e77b6c9 Mon Sep 17 00:00:00 2001
From: chaos2418 <>
Date: Wed, 15 Dec 2021 09:22:21 +0530
Subject: [PATCH 1/2] JAVA-3060: adding new spring-reactive module for the
ebook.
---
pom.xml | 2 +
spring-reactive/README.md | 17 +
spring-reactive/pom.xml | 66 ++++
.../reactive/concurrency/Application.java | 17 +
.../reactive/concurrency/Controller.java | 127 +++++++
.../baeldung/reactive/concurrency/Person.java | 27 ++
.../concurrency/PersonRepository.java | 6 +
.../ConsumerDebuggingApplication.java | 33 ++
.../consumer/chronjobs/ChronJobs.java | 152 ++++++++
.../ReactiveConfigsToggleRestController.java | 22 ++
.../debugging/consumer/model/Foo.java | 27 ++
.../debugging/consumer/model/FooDto.java | 16 +
.../consumer/service/FooNameHelper.java | 44 +++
.../consumer/service/FooQuantityHelper.java | 30 ++
.../consumer/service/FooReporter.java | 24 ++
.../consumer/service/FooService.java | 118 +++++++
.../server/ServerDebuggingApplication.java | 29 ++
.../server/handlers/ServerHandler.java | 45 +++
.../reactive/debugging/server/model/Foo.java | 13 +
.../server/routers/ServerRouter.java | 21 ++
.../ErrorHandlingApplication.java | 24 ++
.../errorhandling/GlobalErrorAttributes.java | 52 +++
.../GlobalErrorWebExceptionHandler.java | 48 +++
.../errorhandling/NameRequiredException.java | 11 +
.../errorhandling/handlers/Handler1.java | 28 ++
.../errorhandling/handlers/Handler2.java | 37 ++
.../errorhandling/handlers/Handler3.java | 33 ++
.../errorhandling/handlers/Handler4.java | 29 ++
.../errorhandling/handlers/Handler5.java | 21 ++
.../errorhandling/routers/Router1.java | 21 ++
.../errorhandling/routers/Router2.java | 21 ++
.../errorhandling/routers/Router3.java | 21 ++
.../errorhandling/routers/Router4.java | 21 ++
.../errorhandling/routers/Router5.java | 21 ++
.../reactive/security/GreetController.java | 37 ++
.../reactive/security/GreetService.java | 15 +
.../reactive/security/SecurityConfig.java | 55 +++
.../security/SpringSecurity5Application.java | 34 ++
.../com/baeldung/reactive/webclient/Foo.java | 24 ++
.../baeldung/reactive/webclient/Tweet.java | 13 +
.../TweetsSlowServiceController.java | 20 ++
.../webclient/WebClientApplication.java | 14 +
.../webclient/WebClientController.java | 41 +++
.../reactive/webclient/WebController.java | 60 ++++
.../SpringWebClientRequestsApp.java | 12 +
.../baeldung/reactive/webflux/Employee.java | 17 +
.../reactive/webflux/EmployeeRepository.java | 59 ++++
.../annotation/EmployeeController.java | 39 +++
.../annotation/EmployeeSpringApplication.java | 16 +
.../webflux/annotation/EmployeeWebClient.java | 32 ++
.../annotation/EmployeeWebSecurityConfig.java | 45 +++
.../functional/EmployeeFunctionalConfig.java | 74 ++++
.../EmployeeSpringFunctionalApplication.java | 13 +
.../ConsumerFooServiceIntegrationTest.java | 62 ++++
.../consumer/ConsumerFooServiceLiveTest.java | 53 +++
.../consumer/utils/ListAppender.java | 25 ++
.../introduction/ReactorIntegrationTest.java | 123 +++++++
.../security/SecurityIntegrationTest.java | 37 ++
.../reactive/webclient/SpringContextTest.java | 12 +
.../webclient/WebClientIntegrationTest.java | 325 ++++++++++++++++++
.../WebControllerIntegrationTest.java | 51 +++
.../WebTestClientIntegrationTest.java | 98 ++++++
.../WebClientRequestsUnitTest.java | 176 ++++++++++
.../EmployeeControllerIntegrationTest.java | 74 ++++
...ployeeSpringFunctionalIntegrationTest.java | 92 +++++
.../src/test/resources/logback-test.xml | 12 +
66 files changed, 2984 insertions(+)
create mode 100644 spring-reactive/README.md
create mode 100644 spring-reactive/pom.xml
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Application.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Controller.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Person.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/concurrency/PersonRepository.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/ConsumerDebuggingApplication.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/chronjobs/ChronJobs.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/controllers/ReactiveConfigsToggleRestController.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/Foo.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/FooDto.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooNameHelper.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooQuantityHelper.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooReporter.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooService.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/ServerDebuggingApplication.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/handlers/ServerHandler.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/model/Foo.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/routers/ServerRouter.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/ErrorHandlingApplication.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorAttributes.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorWebExceptionHandler.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/NameRequiredException.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler1.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler2.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler3.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler4.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler5.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router1.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router2.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router3.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router4.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router5.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/security/GreetController.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/security/GreetService.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/security/SecurityConfig.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/security/SpringSecurity5Application.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/webclient/Foo.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/webclient/Tweet.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/webclient/TweetsSlowServiceController.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebClientApplication.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebClientController.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebController.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/webclientrequests/SpringWebClientRequestsApp.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/webflux/Employee.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/webflux/EmployeeRepository.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeController.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeSpringApplication.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeWebClient.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeWebSecurityConfig.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/webflux/functional/EmployeeFunctionalConfig.java
create mode 100644 spring-reactive/src/main/java/com/baeldung/reactive/webflux/functional/EmployeeSpringFunctionalApplication.java
create mode 100644 spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceIntegrationTest.java
create mode 100644 spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceLiveTest.java
create mode 100644 spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/utils/ListAppender.java
create mode 100644 spring-reactive/src/test/java/com/baeldung/reactive/introduction/ReactorIntegrationTest.java
create mode 100644 spring-reactive/src/test/java/com/baeldung/reactive/security/SecurityIntegrationTest.java
create mode 100644 spring-reactive/src/test/java/com/baeldung/reactive/webclient/SpringContextTest.java
create mode 100644 spring-reactive/src/test/java/com/baeldung/reactive/webclient/WebClientIntegrationTest.java
create mode 100644 spring-reactive/src/test/java/com/baeldung/reactive/webclient/WebControllerIntegrationTest.java
create mode 100644 spring-reactive/src/test/java/com/baeldung/reactive/webclient/WebTestClientIntegrationTest.java
create mode 100644 spring-reactive/src/test/java/com/baeldung/reactive/webclientrequests/WebClientRequestsUnitTest.java
create mode 100644 spring-reactive/src/test/java/com/baeldung/reactive/webflux/annotation/EmployeeControllerIntegrationTest.java
create mode 100644 spring-reactive/src/test/java/com/baeldung/reactive/webflux/functional/EmployeeSpringFunctionalIntegrationTest.java
create mode 100644 spring-reactive/src/test/resources/logback-test.xml
diff --git a/pom.xml b/pom.xml
index ad002650d7..3770c05b49 100644
--- a/pom.xml
+++ b/pom.xml
@@ -619,6 +619,7 @@
spring-5-reactive-security
spring-5-webflux
spring-5-webflux-2
+ spring-reactive
spring-activiti
spring-akka
@@ -1089,6 +1090,7 @@
spring-5-reactive-oauth
spring-5-reactive-security
spring-5-webflux
+ spring-reactive
spring-activiti
spring-akka
diff --git a/spring-reactive/README.md b/spring-reactive/README.md
new file mode 100644
index 0000000000..6e887fe2f8
--- /dev/null
+++ b/spring-reactive/README.md
@@ -0,0 +1,17 @@
+## Spring Reactive
+
+This module contains articles about Spring Reactor.
+
+## Relevant articles:
+
+- [Introduction to Project Reactor Bus](https://www.baeldung.com/reactor-bus)
+- [Intro To Reactor Core](https://www.baeldung.com/reactor-core)
+- [Debugging Reactive Streams in Java](https://www.baeldung.com/spring-debugging-reactive-streams)
+- [Guide to Spring 5 WebFlux](https://www.baeldung.com/spring-webflux)
+- [Introduction to the Functional Web Framework in Spring 5](https://www.baeldung.com/spring-5-functional-web)
+- [Spring 5 WebClient](https://www.baeldung.com/spring-5-webclient)
+- [Spring WebClient vs. RestTemplate](https://www.baeldung.com/spring-webclient-resttemplate)
+- [https://www.baeldung.com/webflux-webclient-parameters](https://www.baeldung.com/webflux-webclient-parameters)
+- [Handling Errors in Spring WebFlux](https://www.baeldung.com/spring-webflux-errors)
+- [Spring Security 5 for Reactive Applications](https://www.baeldung.com/spring-security-5-reactive)
+- [https://www.baeldung.com/spring-webflux-concurrency](https://www.baeldung.com/spring-webflux-concurrency)
\ No newline at end of file
diff --git a/spring-reactive/pom.xml b/spring-reactive/pom.xml
new file mode 100644
index 0000000000..a4d375ea15
--- /dev/null
+++ b/spring-reactive/pom.xml
@@ -0,0 +1,66 @@
+
+
+ 4.0.0
+
+
+ com.baeldung
+ parent-boot-2
+ 0.0.1-SNAPSHOT
+ ../parent-boot-2
+
+
+ spring-reactive
+
+
+
+ org.springframework.boot
+ spring-boot-starter-webflux
+
+
+ org.springframework.boot
+ spring-boot-starter-validation
+
+
+ org.springframework.boot
+ spring-boot-starter-security
+
+
+ io.reactivex.rxjava2
+ rxjava
+ ${rxjava.version}
+
+
+ io.projectreactor.kafka
+ reactor-kafka
+ ${reactor-kafka.version}
+
+
+ org.springframework.boot
+ spring-boot-starter-data-mongodb-reactive
+
+
+ org.springframework.security
+ spring-security-test
+ test
+
+
+ io.projectreactor
+ reactor-test
+ ${reactor.version}
+ test
+
+
+ org.projectlombok
+ lombok
+
+
+
+
+ 3.4.12
+ 1.2.2.RELEASE
+ 2.2.19
+
+
+
\ No newline at end of file
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Application.java b/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Application.java
new file mode 100644
index 0000000000..f34d930ef0
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Application.java
@@ -0,0 +1,17 @@
+package com.baeldung.reactive.concurrency;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+/**
+* Please note we assume Mongo and Kafka are running in the local machine and on default configuration.
+* Additionally, if you want to experiment with Tomcat/Jetty instead of Netty, just uncomment the lines in pom.xml and rebuild.
+*/
+@SpringBootApplication
+public class Application {
+
+ public static void main(String[] args) {
+ SpringApplication.run(Application.class, args);
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Controller.java b/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Controller.java
new file mode 100644
index 0000000000..70928d4dca
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Controller.java
@@ -0,0 +1,127 @@
+package com.baeldung.reactive.concurrency;
+
+import io.reactivex.Observable;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.IntegerDeserializer;
+import org.apache.kafka.common.serialization.IntegerSerializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Scheduler;
+import reactor.core.scheduler.Schedulers;
+import reactor.kafka.receiver.KafkaReceiver;
+import reactor.kafka.receiver.ReceiverOptions;
+import reactor.kafka.receiver.ReceiverRecord;
+import reactor.kafka.sender.KafkaSender;
+import reactor.kafka.sender.SenderOptions;
+import reactor.kafka.sender.SenderRecord;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+@RestController
+@RequestMapping("/")
+public class Controller {
+
+ @Autowired
+ private PersonRepository personRepository;
+
+ private Scheduler scheduler = Schedulers.newBoundedElastic(5, 10, "MyThreadGroup");
+
+ private Logger logger = LoggerFactory.getLogger(Controller.class);
+
+ @GetMapping("/threads/webflux")
+ public Flux getThreadsWebflux() {
+ return Flux.fromIterable(getThreads());
+ }
+
+ @GetMapping("/threads/webclient")
+ public Flux getThreadsWebClient() {
+ WebClient.create("http://localhost:8080/index")
+ .get()
+ .retrieve()
+ .bodyToMono(String.class)
+ .subscribeOn(scheduler)
+ .publishOn(scheduler)
+ .doOnNext(s -> logger.info("Response: {}", s))
+ .subscribe();
+ return Flux.fromIterable(getThreads());
+ }
+
+ @GetMapping("/threads/rxjava")
+ public Observable getIndexRxJava() {
+ Observable.fromIterable(Arrays.asList("Hello", "World"))
+ .map(s -> s.toUpperCase())
+ .observeOn(io.reactivex.schedulers.Schedulers.trampoline())
+ .doOnNext(s -> logger.info("String: {}", s))
+ .subscribe();
+ return Observable.fromIterable(getThreads());
+ }
+
+ @GetMapping("/threads/mongodb")
+ public Flux getIndexMongo() {
+ personRepository.findAll()
+ .doOnNext(p -> logger.info("Person: {}", p))
+ .subscribe();
+ return Flux.fromIterable(getThreads());
+ }
+
+ @GetMapping("/threads/reactor-kafka")
+ public Flux getIndexKafka() {
+ Map producerProps = new HashMap<>();
+ producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
+ producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ SenderOptions senderOptions = SenderOptions.create(producerProps);
+ KafkaSender sender = KafkaSender.create(senderOptions);
+ Flux> outboundFlux = Flux.range(1, 10)
+ .map(i -> SenderRecord.create(new ProducerRecord<>("reactive-test", i, "Message_" + i), i));
+ sender.send(outboundFlux)
+ .subscribe();
+
+ Map consumerProps = new HashMap<>();
+ consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
+ consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "my-consumer");
+ consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group");
+ consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
+ consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ ReceiverOptions receiverOptions = ReceiverOptions.create(consumerProps);
+ receiverOptions.subscription(Collections.singleton("reactive-test"));
+ KafkaReceiver receiver = KafkaReceiver.create(receiverOptions);
+ Flux> inboundFlux = receiver.receive();
+ inboundFlux.subscribe(r -> {
+ logger.info("Received message: {}", r.value());
+ r.receiverOffset()
+ .acknowledge();
+ });
+ return Flux.fromIterable(getThreads());
+ }
+
+ @GetMapping("/index")
+ public Mono getIndex() {
+ return Mono.just("Hello world!");
+ }
+
+ private List getThreads() {
+ return Thread.getAllStackTraces()
+ .keySet()
+ .stream()
+ .map(t -> String.format("%-20s \t %s \t %d \t %s\n", t.getName(), t.getState(), t.getPriority(), t.isDaemon() ? "Daemon" : "Normal"))
+ .collect(Collectors.toList());
+ }
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Person.java b/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Person.java
new file mode 100644
index 0000000000..10029330af
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Person.java
@@ -0,0 +1,27 @@
+package com.baeldung.reactive.concurrency;
+
+import org.springframework.data.annotation.Id;
+import org.springframework.data.mongodb.core.mapping.Document;
+
+@Document
+public class Person {
+ @Id
+ String id;
+
+ public Person(String id) {
+ this.id = id;
+ }
+
+ public String getId() {
+ return id;
+ }
+
+ public void setId(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public String toString() {
+ return "Person{" + "id='" + id + '\'' + '}';
+ }
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/PersonRepository.java b/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/PersonRepository.java
new file mode 100644
index 0000000000..221ea3d74d
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/PersonRepository.java
@@ -0,0 +1,6 @@
+package com.baeldung.reactive.concurrency;
+
+import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
+
+public interface PersonRepository extends ReactiveMongoRepository {
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/ConsumerDebuggingApplication.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/ConsumerDebuggingApplication.java
new file mode 100644
index 0000000000..fa10383c95
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/ConsumerDebuggingApplication.java
@@ -0,0 +1,33 @@
+package com.baeldung.reactive.debugging.consumer;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration;
+import org.springframework.context.annotation.Bean;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.security.config.web.server.ServerHttpSecurity;
+import org.springframework.security.web.server.SecurityWebFilterChain;
+import reactor.core.publisher.Hooks;
+
+import java.util.Collections;
+
+@SpringBootApplication(exclude = MongoReactiveAutoConfiguration.class)
+@EnableScheduling
+public class ConsumerDebuggingApplication {
+
+ public static void main(String[] args) {
+ Hooks.onOperatorDebug();
+ SpringApplication app = new SpringApplication(ConsumerDebuggingApplication.class);
+ app.setDefaultProperties(Collections.singletonMap("server.port", "8082"));
+ app.run(args);
+ }
+
+ @Bean
+ public SecurityWebFilterChain debuggingConsumerSpringSecurityFilterChain(ServerHttpSecurity http) {
+ http.authorizeExchange()
+ .anyExchange()
+ .permitAll();
+ http.csrf().disable();
+ return http.build();
+ }
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/chronjobs/ChronJobs.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/chronjobs/ChronJobs.java
new file mode 100644
index 0000000000..b58648ff9d
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/chronjobs/ChronJobs.java
@@ -0,0 +1,152 @@
+package com.baeldung.reactive.debugging.consumer.chronjobs;
+
+import com.baeldung.reactive.debugging.consumer.model.Foo;
+import com.baeldung.reactive.debugging.consumer.model.FooDto;
+import com.baeldung.reactive.debugging.consumer.service.FooService;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.http.MediaType;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Flux;
+
+import java.time.Duration;
+import java.util.concurrent.ThreadLocalRandom;
+
+@Component
+public class ChronJobs {
+
+ private static Logger logger = LoggerFactory.getLogger(ChronJobs.class);
+ private WebClient client = WebClient.create("http://localhost:8081");
+
+ @Autowired
+ private FooService service;
+
+ @Scheduled(fixedRate = 10000)
+ public void consumeInfiniteFlux() {
+ Flux fluxFoo = client.get()
+ .uri("/functional-reactive/periodic-foo")
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .retrieve()
+ .bodyToFlux(FooDto.class)
+ .delayElements(Duration.ofMillis(100))
+ .map(dto -> {
+ logger.debug("process 1 with dto id {} name{}", dto.getId(), dto.getName());
+ return new Foo(dto);
+ });
+ Integer random = ThreadLocalRandom.current()
+ .nextInt(0, 3);
+ switch (random) {
+ case 0:
+ logger.info("process 1 with approach 1");
+ service.processFoo(fluxFoo);
+ break;
+ case 1:
+ logger.info("process 1 with approach 1 EH");
+ service.processUsingApproachOneWithErrorHandling(fluxFoo);
+ break;
+ default:
+ logger.info("process 1 with approach 2");
+ service.processFooInAnotherScenario(fluxFoo);
+ break;
+
+ }
+ }
+
+ @Scheduled(fixedRate = 20000)
+ public void consumeFiniteFlux2() {
+ Flux fluxFoo = client.get()
+ .uri("/functional-reactive/periodic-foo-2")
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .retrieve()
+ .bodyToFlux(FooDto.class)
+ .delayElements(Duration.ofMillis(100))
+ .map(dto -> {
+ logger.debug("process 2 with dto id {} name{}", dto.getId(), dto.getName());
+ return new Foo(dto);
+ });
+ Integer random = ThreadLocalRandom.current()
+ .nextInt(0, 3);
+ switch (random) {
+ case 0:
+ logger.info("process 2 with approach 1");
+ service.processFoo(fluxFoo);
+ break;
+ case 1:
+ logger.info("process 2 with approach 1 EH");
+ service.processUsingApproachOneWithErrorHandling(fluxFoo);
+ break;
+ default:
+ logger.info("process 2 with approach 2");
+ service.processFooInAnotherScenario(fluxFoo);
+ break;
+
+ }
+ }
+
+ @Scheduled(fixedRate = 20000)
+ public void consumeFiniteFlux3() {
+ Flux fluxFoo = client.get()
+ .uri("/functional-reactive/periodic-foo-2")
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .retrieve()
+ .bodyToFlux(FooDto.class)
+ .delayElements(Duration.ofMillis(100))
+ .map(dto -> {
+ logger.debug("process 3 with dto id {} name{}", dto.getId(), dto.getName());
+ return new Foo(dto);
+ });
+ logger.info("process 3 with approach 3");
+ service.processUsingApproachThree(fluxFoo);
+ }
+
+ @Scheduled(fixedRate = 20000)
+ public void consumeFiniteFluxWithCheckpoint4() {
+ Flux fluxFoo = client.get()
+ .uri("/functional-reactive/periodic-foo-2")
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .retrieve()
+ .bodyToFlux(FooDto.class)
+ .delayElements(Duration.ofMillis(100))
+ .map(dto -> {
+ logger.debug("process 4 with dto id {} name{}", dto.getId(), dto.getName());
+ return new Foo(dto);
+ });
+ logger.info("process 4 with approach 4");
+ service.processUsingApproachFourWithCheckpoint(fluxFoo);
+ }
+
+ @Scheduled(fixedRate = 20000)
+ public void consumeFiniteFluxWitParallelScheduler() {
+ Flux fluxFoo = client.get()
+ .uri("/functional-reactive/periodic-foo-2")
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .retrieve()
+ .bodyToFlux(FooDto.class)
+ .delayElements(Duration.ofMillis(100))
+ .map(dto -> {
+ logger.debug("process 5-parallel with dto id {} name{}", dto.getId(), dto.getName());
+ return new Foo(dto);
+ });
+ logger.info("process 5-parallel with approach 5-parallel");
+ service.processUsingApproachFivePublishingToDifferentParallelThreads(fluxFoo);
+ }
+
+ @Scheduled(fixedRate = 20000)
+ public void consumeFiniteFluxWithSingleSchedulers() {
+ Flux fluxFoo = client.get()
+ .uri("/functional-reactive/periodic-foo-2")
+ .accept(MediaType.TEXT_EVENT_STREAM)
+ .retrieve()
+ .bodyToFlux(FooDto.class)
+ .delayElements(Duration.ofMillis(100))
+ .map(dto -> {
+ logger.debug("process 5-single with dto id {} name{}", dto.getId(), dto.getName());
+ return new Foo(dto);
+ });
+ logger.info("process 5-single with approach 5-single");
+ service.processUsingApproachFivePublishingToDifferentSingleThreads(fluxFoo);
+ }
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/controllers/ReactiveConfigsToggleRestController.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/controllers/ReactiveConfigsToggleRestController.java
new file mode 100644
index 0000000000..df13113a82
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/controllers/ReactiveConfigsToggleRestController.java
@@ -0,0 +1,22 @@
+package com.baeldung.reactive.debugging.consumer.controllers;
+
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Hooks;
+
+@RestController
+public class ReactiveConfigsToggleRestController {
+
+ @GetMapping("/debug-hook-on")
+ public String setReactiveDebugOn() {
+ Hooks.onOperatorDebug();
+ return "DEBUG HOOK ON";
+ }
+
+ @GetMapping("/debug-hook-off")
+ public String setReactiveDebugOff() {
+ Hooks.resetOnOperatorDebug();
+ return "DEBUG HOOK OFF";
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/Foo.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/Foo.java
new file mode 100644
index 0000000000..d20e2c9ba0
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/Foo.java
@@ -0,0 +1,27 @@
+package com.baeldung.reactive.debugging.consumer.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class Foo {
+
+ private Integer id;
+ private String formattedName;
+ private Integer quantity;
+
+ public Foo(FooDto dto) {
+ this.id = (ThreadLocalRandom.current()
+ .nextInt(0, 100) == 0) ? null : dto.getId();
+ this.formattedName = dto.getName();
+ this.quantity = ThreadLocalRandom.current()
+ .nextInt(0, 10);
+ }
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/FooDto.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/FooDto.java
new file mode 100644
index 0000000000..bf6f614e18
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/FooDto.java
@@ -0,0 +1,16 @@
+package com.baeldung.reactive.debugging.consumer.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class FooDto {
+
+ private Integer id;
+ private String name;
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooNameHelper.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooNameHelper.java
new file mode 100644
index 0000000000..cdd9ca31a6
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooNameHelper.java
@@ -0,0 +1,44 @@
+package com.baeldung.reactive.debugging.consumer.service;
+
+import com.baeldung.reactive.debugging.consumer.model.Foo;
+import reactor.core.publisher.Flux;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class FooNameHelper {
+
+ public static Flux concatAndSubstringFooName(Flux flux) {
+ flux = concatFooName(flux);
+ flux = substringFooName(flux);
+ return flux;
+ }
+
+ public static Flux concatFooName(Flux flux) {
+ flux = flux.map(foo -> {
+ String processedName = null;
+ Integer random = ThreadLocalRandom.current()
+ .nextInt(0, 80);
+ processedName = (random != 0) ? foo.getFormattedName() : foo.getFormattedName() + "-bael";
+ foo.setFormattedName(processedName);
+ return foo;
+ });
+ return flux;
+ }
+
+ public static Flux substringFooName(Flux flux) {
+ return flux.map(foo -> {
+ String processedName;
+ Integer random = ThreadLocalRandom.current()
+ .nextInt(0, 100);
+
+ processedName = (random == 0) ? foo.getFormattedName()
+ .substring(10, 15)
+ : foo.getFormattedName()
+ .substring(0, 5);
+
+ foo.setFormattedName(processedName);
+ return foo;
+ });
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooQuantityHelper.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooQuantityHelper.java
new file mode 100644
index 0000000000..f4600b41b9
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooQuantityHelper.java
@@ -0,0 +1,30 @@
+package com.baeldung.reactive.debugging.consumer.service;
+
+import com.baeldung.reactive.debugging.consumer.model.Foo;
+import reactor.core.publisher.Flux;
+
+import java.util.concurrent.ThreadLocalRandom;
+
+public class FooQuantityHelper {
+
+ public static Flux processFooReducingQuantity(Flux flux) {
+ flux = flux.map(foo -> {
+ Integer result;
+ Integer random = ThreadLocalRandom.current()
+ .nextInt(0, 90);
+ result = (random == 0) ? result = 0 : foo.getQuantity() + 2;
+ foo.setQuantity(result);
+ return foo;
+ });
+ return divideFooQuantity(flux);
+ }
+
+ public static Flux divideFooQuantity(Flux flux) {
+ return flux.map(foo -> {
+ Integer result = Math.round(5 / foo.getQuantity());
+ foo.setQuantity(result);
+ return foo;
+ });
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooReporter.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooReporter.java
new file mode 100644
index 0000000000..1a8f9bc783
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooReporter.java
@@ -0,0 +1,24 @@
+package com.baeldung.reactive.debugging.consumer.service;
+
+import com.baeldung.reactive.debugging.consumer.model.Foo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.Flux;
+
+public class FooReporter {
+
+ private static Logger logger = LoggerFactory.getLogger(FooReporter.class);
+
+ public static Flux reportResult(Flux input, String approach) {
+ return input.map(foo -> {
+ if (foo.getId() == null)
+ throw new IllegalArgumentException("Null id is not valid!");
+ logger.info("Reporting for approach {}: Foo with id '{}' name '{}' and quantity '{}'", approach, foo.getId(), foo.getFormattedName(), foo.getQuantity());
+ return foo;
+ });
+ }
+
+ public static Flux reportResult(Flux input) {
+ return reportResult(input, "default");
+ }
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooService.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooService.java
new file mode 100644
index 0000000000..bafaa3cfa0
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooService.java
@@ -0,0 +1,118 @@
+package com.baeldung.reactive.debugging.consumer.service;
+
+import com.baeldung.reactive.debugging.consumer.model.Foo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.stereotype.Component;
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
+
+import static com.baeldung.reactive.debugging.consumer.service.FooNameHelper.concatAndSubstringFooName;
+import static com.baeldung.reactive.debugging.consumer.service.FooNameHelper.substringFooName;
+import static com.baeldung.reactive.debugging.consumer.service.FooQuantityHelper.divideFooQuantity;
+import static com.baeldung.reactive.debugging.consumer.service.FooQuantityHelper.processFooReducingQuantity;
+import static com.baeldung.reactive.debugging.consumer.service.FooReporter.reportResult;
+
+@Component
+public class FooService {
+
+ private static Logger logger = LoggerFactory.getLogger(FooService.class);
+
+ public void processFoo(Flux flux) {
+ flux = FooNameHelper.concatFooName(flux);
+ flux = FooNameHelper.substringFooName(flux);
+ flux = flux.log();
+ flux = FooReporter.reportResult(flux);
+ flux = flux.doOnError(error -> {
+ logger.error("The following error happened on processFoo method!", error);
+ });
+ flux.subscribe();
+ }
+
+ public void processFooInAnotherScenario(Flux flux) {
+ flux = FooNameHelper.substringFooName(flux);
+ flux = FooQuantityHelper.divideFooQuantity(flux);
+ flux.subscribe();
+ }
+
+ public void processUsingApproachOneWithErrorHandling(Flux flux) {
+ logger.info("starting approach one w error handling!");
+ flux = concatAndSubstringFooName(flux);
+ flux = concatAndSubstringFooName(flux);
+ flux = substringFooName(flux);
+ flux = processFooReducingQuantity(flux);
+ flux = processFooReducingQuantity(flux);
+ flux = processFooReducingQuantity(flux);
+ flux = reportResult(flux, "ONE w/ EH");
+ flux = flux.doOnError(error -> {
+ logger.error("Approach 1 with Error Handling failed!", error);
+ });
+ flux.subscribe();
+ }
+
+ public void processUsingApproachThree(Flux flux) {
+ logger.info("starting approach three!");
+ flux = concatAndSubstringFooName(flux);
+ flux = reportResult(flux, "THREE");
+ flux = flux.doOnError(error -> {
+ logger.error("Approach 3 failed!", error);
+ });
+ flux.subscribe();
+ }
+
+ public void processUsingApproachFourWithCheckpoint(Flux flux) {
+ logger.info("starting approach four!");
+ flux = concatAndSubstringFooName(flux);
+ flux = flux.checkpoint("CHECKPOINT 1");
+ flux = concatAndSubstringFooName(flux);
+ flux = divideFooQuantity(flux);
+ flux = flux.checkpoint("CHECKPOINT 2", true);
+ flux = reportResult(flux, "FOUR");
+ flux = concatAndSubstringFooName(flux).doOnError(error -> {
+ logger.error("Approach 4 failed!", error);
+ });
+ flux.subscribe();
+ }
+
+ public void processUsingApproachFourWithInitialCheckpoint(Flux flux) {
+ logger.info("starting approach four!");
+ flux = concatAndSubstringFooName(flux);
+ flux = flux.checkpoint("CHECKPOINT 1", true);
+ flux = concatAndSubstringFooName(flux);
+ flux = divideFooQuantity(flux);
+ flux = reportResult(flux, "FOUR");
+ flux = flux.doOnError(error -> {
+ logger.error("Approach 4-2 failed!", error);
+ });
+ flux.subscribe();
+ }
+
+ public void processUsingApproachFivePublishingToDifferentParallelThreads(Flux flux) {
+ logger.info("starting approach five-parallel!");
+ flux = concatAndSubstringFooName(flux).publishOn(Schedulers.newParallel("five-parallel-foo"))
+ .log();
+ flux = concatAndSubstringFooName(flux);
+ flux = divideFooQuantity(flux);
+ flux = reportResult(flux, "FIVE-PARALLEL").publishOn(Schedulers.newSingle("five-parallel-bar"));
+ flux = concatAndSubstringFooName(flux).doOnError(error -> {
+ logger.error("Approach 5-parallel failed!", error);
+ });
+ flux.subscribeOn(Schedulers.newParallel("five-parallel-starter"))
+ .subscribe();
+ }
+
+ public void processUsingApproachFivePublishingToDifferentSingleThreads(Flux flux) {
+ logger.info("starting approach five-single!");
+ flux = flux.log()
+ .subscribeOn(Schedulers.newSingle("five-single-starter"));
+ flux = concatAndSubstringFooName(flux).publishOn(Schedulers.newSingle("five-single-foo"));
+ flux = concatAndSubstringFooName(flux);
+ flux = divideFooQuantity(flux);
+ flux = reportResult(flux, "FIVE-SINGLE").publishOn(Schedulers.newSingle("five-single-bar"));
+ flux = concatAndSubstringFooName(flux).doOnError(error -> {
+ logger.error("Approach 5-single failed!", error);
+ });
+ flux.subscribe();
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/ServerDebuggingApplication.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/ServerDebuggingApplication.java
new file mode 100644
index 0000000000..c9838705b5
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/ServerDebuggingApplication.java
@@ -0,0 +1,29 @@
+package com.baeldung.reactive.debugging.server;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.security.config.web.server.ServerHttpSecurity;
+import org.springframework.security.web.server.SecurityWebFilterChain;
+import org.springframework.web.reactive.config.EnableWebFlux;
+
+import java.util.Collections;
+
+@EnableWebFlux
+@SpringBootApplication
+public class ServerDebuggingApplication {
+
+ public static void main(String[] args) {
+ SpringApplication app = new SpringApplication(ServerDebuggingApplication.class);
+ app.setDefaultProperties(Collections.singletonMap("server.port", "8081"));
+ app.run(args);
+ }
+
+ @Bean
+ public SecurityWebFilterChain debuggingServerSpringSecurityFilterChain(ServerHttpSecurity http) {
+ http.authorizeExchange()
+ .anyExchange()
+ .permitAll();
+ return http.build();
+ }
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/handlers/ServerHandler.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/handlers/ServerHandler.java
new file mode 100644
index 0000000000..15f9a4b786
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/handlers/ServerHandler.java
@@ -0,0 +1,45 @@
+package com.baeldung.reactive.debugging.server.handlers;
+
+import com.baeldung.reactive.debugging.server.model.Foo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.server.ServerRequest;
+import org.springframework.web.reactive.function.server.ServerResponse;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.time.Duration;
+import java.util.concurrent.ThreadLocalRandom;
+
+@Component
+public class ServerHandler {
+
+ private static Logger logger = LoggerFactory.getLogger(ServerHandler.class);
+
+ public Mono useHandler(final ServerRequest request) {
+ // there are chances that something goes wrong here...
+ return ServerResponse.ok()
+ .contentType(MediaType.TEXT_EVENT_STREAM)
+ .body(Flux.interval(Duration.ofSeconds(1))
+ .map(sequence -> {
+ logger.info("retrieving Foo. Sequence: {}", sequence);
+ if (ThreadLocalRandom.current()
+ .nextInt(0, 50) == 1) {
+ throw new RuntimeException("There was an error retrieving the Foo!");
+ }
+ return new Foo(sequence, "name" + sequence);
+
+ }), Foo.class);
+ }
+
+ public Mono useHandlerFinite(final ServerRequest request) {
+ return ServerResponse.ok()
+ .contentType(MediaType.TEXT_EVENT_STREAM)
+ .body(Flux.range(0, 50)
+ .map(sequence -> {
+ return new Foo(new Long(sequence), "theFooNameNumber" + sequence);
+ }), Foo.class);
+ }
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/model/Foo.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/model/Foo.java
new file mode 100644
index 0000000000..2c419a23f8
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/model/Foo.java
@@ -0,0 +1,13 @@
+package com.baeldung.reactive.debugging.server.model;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+
+@Data
+@AllArgsConstructor
+public class Foo {
+
+ private Long id;
+ private String name;
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/routers/ServerRouter.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/routers/ServerRouter.java
new file mode 100644
index 0000000000..5db2ab92b6
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/routers/ServerRouter.java
@@ -0,0 +1,21 @@
+package com.baeldung.reactive.debugging.server.routers;
+
+import com.baeldung.reactive.debugging.server.handlers.ServerHandler;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.web.reactive.function.server.RequestPredicates;
+import org.springframework.web.reactive.function.server.RouterFunction;
+import org.springframework.web.reactive.function.server.RouterFunctions;
+import org.springframework.web.reactive.function.server.ServerResponse;
+
+@Configuration
+public class ServerRouter {
+
+ @Bean
+ public RouterFunction responseRoute(@Autowired ServerHandler handler) {
+ return RouterFunctions.route(RequestPredicates.GET("/functional-reactive/periodic-foo"), handler::useHandler)
+ .andRoute(RequestPredicates.GET("/functional-reactive/periodic-foo-2"), handler::useHandlerFinite);
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/ErrorHandlingApplication.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/ErrorHandlingApplication.java
new file mode 100644
index 0000000000..a22f04a8d2
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/ErrorHandlingApplication.java
@@ -0,0 +1,24 @@
+package com.baeldung.reactive.errorhandling;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.context.annotation.Bean;
+import org.springframework.security.config.web.server.ServerHttpSecurity;
+import org.springframework.security.web.server.SecurityWebFilterChain;
+
+@SpringBootApplication
+public class ErrorHandlingApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(ErrorHandlingApplication.class, args);
+ }
+
+ @Bean
+ public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) {
+ http.authorizeExchange()
+ .anyExchange()
+ .permitAll();
+ http.csrf().disable();
+ return http.build();
+ }
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorAttributes.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorAttributes.java
new file mode 100644
index 0000000000..0a96a8593c
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorAttributes.java
@@ -0,0 +1,52 @@
+package com.baeldung.reactive.errorhandling;
+
+import org.springframework.boot.web.error.ErrorAttributeOptions;
+import org.springframework.boot.web.reactive.error.DefaultErrorAttributes;
+import org.springframework.http.HttpStatus;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.server.ServerRequest;
+
+import java.util.Map;
+
+@Component
+public class GlobalErrorAttributes extends DefaultErrorAttributes{
+
+ private HttpStatus status = HttpStatus.BAD_REQUEST;
+ private String message = "please provide a name";
+
+ @Override
+ public Map getErrorAttributes(ServerRequest request, ErrorAttributeOptions options) {
+ Map map = super.getErrorAttributes(request, options);
+ map.put("status", getStatus());
+ map.put("message", getMessage());
+ return map;
+ }
+
+ /**
+ * @return the status
+ */
+ public HttpStatus getStatus() {
+ return status;
+ }
+
+ /**
+ * @param status the status to set
+ */
+ public void setStatus(HttpStatus status) {
+ this.status = status;
+ }
+
+ /**
+ * @return the message
+ */
+ public String getMessage() {
+ return message;
+ }
+
+ /**
+ * @param message the message to set
+ */
+ public void setMessage(String message) {
+ this.message = message;
+ }
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorWebExceptionHandler.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorWebExceptionHandler.java
new file mode 100644
index 0000000000..24583308cd
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorWebExceptionHandler.java
@@ -0,0 +1,48 @@
+package com.baeldung.reactive.errorhandling;
+
+import org.springframework.boot.autoconfigure.web.WebProperties;
+import org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler;
+import org.springframework.boot.web.error.ErrorAttributeOptions;
+import org.springframework.boot.web.reactive.error.ErrorAttributes;
+import org.springframework.context.ApplicationContext;
+import org.springframework.core.annotation.Order;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.codec.ServerCodecConfigurer;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.BodyInserters;
+import org.springframework.web.reactive.function.server.RequestPredicates;
+import org.springframework.web.reactive.function.server.RouterFunction;
+import org.springframework.web.reactive.function.server.RouterFunctions;
+import org.springframework.web.reactive.function.server.ServerRequest;
+import org.springframework.web.reactive.function.server.ServerResponse;
+import reactor.core.publisher.Mono;
+
+import java.util.Map;
+
+@Component
+@Order(-2)
+public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHandler {
+
+ public GlobalErrorWebExceptionHandler(GlobalErrorAttributes g, ApplicationContext applicationContext,
+ ServerCodecConfigurer serverCodecConfigurer) {
+ super(g, new WebProperties.Resources(), applicationContext);
+ super.setMessageWriters(serverCodecConfigurer.getWriters());
+ super.setMessageReaders(serverCodecConfigurer.getReaders());
+ }
+
+ @Override
+ protected RouterFunction getRoutingFunction(final ErrorAttributes errorAttributes) {
+ return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse);
+ }
+
+ private Mono renderErrorResponse(final ServerRequest request) {
+
+ final Map errorPropertiesMap = getErrorAttributes(request, ErrorAttributeOptions.defaults());
+
+ return ServerResponse.status(HttpStatus.BAD_REQUEST)
+ .contentType(MediaType.APPLICATION_JSON)
+ .body(BodyInserters.fromValue(errorPropertiesMap));
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/NameRequiredException.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/NameRequiredException.java
new file mode 100644
index 0000000000..bdc7771b80
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/NameRequiredException.java
@@ -0,0 +1,11 @@
+package com.baeldung.reactive.errorhandling;
+
+import org.springframework.http.HttpStatus;
+import org.springframework.web.server.ResponseStatusException;
+
+public class NameRequiredException extends ResponseStatusException {
+
+ public NameRequiredException(HttpStatus status, String message, Throwable e) {
+ super(status, message, e);
+ }
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler1.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler1.java
new file mode 100644
index 0000000000..32f2f1c3a2
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler1.java
@@ -0,0 +1,28 @@
+package com.baeldung.reactive.errorhandling.handlers;
+
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.server.ServerRequest;
+import org.springframework.web.reactive.function.server.ServerResponse;
+import reactor.core.publisher.Mono;
+
+@Component
+public class Handler1 {
+
+ public Mono handleRequest1(ServerRequest request) {
+ return sayHello(request).onErrorReturn("Hello, Stranger")
+ .flatMap(s -> ServerResponse.ok()
+ .contentType(MediaType.TEXT_PLAIN)
+ .bodyValue(s));
+ }
+
+ private Mono sayHello(ServerRequest request) {
+ try {
+ return Mono.just("Hello, " + request.queryParam("name")
+ .get());
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler2.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler2.java
new file mode 100644
index 0000000000..093120c92b
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler2.java
@@ -0,0 +1,37 @@
+package com.baeldung.reactive.errorhandling.handlers;
+
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.server.ServerRequest;
+import org.springframework.web.reactive.function.server.ServerResponse;
+import reactor.core.publisher.Mono;
+
+@Component
+public class Handler2 {
+
+public Mono handleRequest2(ServerRequest request) {
+ return
+ sayHello(request)
+ .flatMap(s -> ServerResponse.ok()
+ .contentType(MediaType.TEXT_PLAIN)
+ .bodyValue(s))
+ .onErrorResume(e -> sayHelloFallback()
+ .flatMap(s -> ServerResponse.ok()
+ .contentType(MediaType.TEXT_PLAIN)
+ .bodyValue(s)));
+}
+
+ private Mono sayHello(ServerRequest request) {
+ try {
+ return Mono.just("Hello, " + request.queryParam("name")
+ .get());
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+ }
+
+ private Mono sayHelloFallback() {
+ return Mono.just("Hello, Stranger");
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler3.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler3.java
new file mode 100644
index 0000000000..44842e0539
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler3.java
@@ -0,0 +1,33 @@
+package com.baeldung.reactive.errorhandling.handlers;
+
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.server.ServerRequest;
+import org.springframework.web.reactive.function.server.ServerResponse;
+import reactor.core.publisher.Mono;
+
+@Component
+public class Handler3 {
+
+ public Mono handleRequest3(ServerRequest request) {
+ return
+ sayHello(request)
+ .flatMap(s -> ServerResponse.ok()
+ .contentType(MediaType.TEXT_PLAIN)
+ .bodyValue(s))
+ .onErrorResume(e -> (Mono.just("Hi, I looked around for your name but found: " +
+ e.getMessage())).flatMap(s -> ServerResponse.ok()
+ .contentType(MediaType.TEXT_PLAIN)
+ .bodyValue(s)));
+ }
+
+ private Mono sayHello(ServerRequest request) {
+ try {
+ return Mono.just("Hello, " + request.queryParam("name")
+ .get());
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler4.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler4.java
new file mode 100644
index 0000000000..2d391a42a7
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler4.java
@@ -0,0 +1,29 @@
+package com.baeldung.reactive.errorhandling.handlers;
+
+import com.baeldung.reactive.errorhandling.NameRequiredException;
+import org.springframework.http.HttpStatus;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.server.ServerRequest;
+import org.springframework.web.reactive.function.server.ServerResponse;
+import reactor.core.publisher.Mono;
+
+@Component
+public class Handler4 {
+
+public Mono handleRequest4(ServerRequest request) {
+ return ServerResponse.ok()
+ .body(sayHello(request)
+ .onErrorResume(e ->
+ Mono.error(new NameRequiredException(
+ HttpStatus.BAD_REQUEST, "please provide a name", e))), String.class);
+}
+
+ private Mono sayHello(ServerRequest request) {
+ try {
+ return Mono.just("Hello, " + request.queryParam("name").get());
+ } catch (Exception e) {
+ return Mono.error(e);
+ }
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler5.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler5.java
new file mode 100644
index 0000000000..a466982865
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler5.java
@@ -0,0 +1,21 @@
+package com.baeldung.reactive.errorhandling.handlers;
+
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.server.ServerRequest;
+import org.springframework.web.reactive.function.server.ServerResponse;
+import reactor.core.publisher.Mono;
+
+@Component
+public class Handler5 {
+
+ public Mono handleRequest5(ServerRequest request) {
+ return ServerResponse.ok()
+ .body(sayHello(request), String.class);
+
+ }
+
+ private Mono sayHello(ServerRequest request) {
+ return Mono.just("Hello, " + request.queryParam("name").get());
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router1.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router1.java
new file mode 100644
index 0000000000..caf779b456
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router1.java
@@ -0,0 +1,21 @@
+package com.baeldung.reactive.errorhandling.routers;
+
+import com.baeldung.reactive.errorhandling.handlers.Handler1;
+import org.springframework.context.annotation.Bean;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.server.RequestPredicates;
+import org.springframework.web.reactive.function.server.RouterFunction;
+import org.springframework.web.reactive.function.server.RouterFunctions;
+import org.springframework.web.reactive.function.server.ServerResponse;
+
+@Component
+public class Router1 {
+
+ @Bean
+ public RouterFunction routeRequest1(Handler1 handler) {
+ return RouterFunctions.route(RequestPredicates.GET("/api/endpoint1")
+ .and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), handler::handleRequest1);
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router2.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router2.java
new file mode 100644
index 0000000000..b965257c30
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router2.java
@@ -0,0 +1,21 @@
+package com.baeldung.reactive.errorhandling.routers;
+
+import com.baeldung.reactive.errorhandling.handlers.Handler2;
+import org.springframework.context.annotation.Bean;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.server.RequestPredicates;
+import org.springframework.web.reactive.function.server.RouterFunction;
+import org.springframework.web.reactive.function.server.RouterFunctions;
+import org.springframework.web.reactive.function.server.ServerResponse;
+
+@Component
+public class Router2 {
+
+ @Bean
+ public RouterFunction routeRequest2(Handler2 handler) {
+ return RouterFunctions.route(RequestPredicates.GET("/api/endpoint2")
+ .and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), handler::handleRequest2);
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router3.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router3.java
new file mode 100644
index 0000000000..b8f7f983cc
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router3.java
@@ -0,0 +1,21 @@
+package com.baeldung.reactive.errorhandling.routers;
+
+import com.baeldung.reactive.errorhandling.handlers.Handler3;
+import org.springframework.context.annotation.Bean;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.server.RequestPredicates;
+import org.springframework.web.reactive.function.server.RouterFunction;
+import org.springframework.web.reactive.function.server.RouterFunctions;
+import org.springframework.web.reactive.function.server.ServerResponse;
+
+@Component
+public class Router3 {
+
+ @Bean
+ public RouterFunction routeRequest3(Handler3 handler) {
+ return RouterFunctions.route(RequestPredicates.GET("/api/endpoint3")
+ .and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), handler::handleRequest3);
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router4.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router4.java
new file mode 100644
index 0000000000..03c65fec67
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router4.java
@@ -0,0 +1,21 @@
+package com.baeldung.reactive.errorhandling.routers;
+
+import com.baeldung.reactive.errorhandling.handlers.Handler4;
+import org.springframework.context.annotation.Bean;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.server.RequestPredicates;
+import org.springframework.web.reactive.function.server.RouterFunction;
+import org.springframework.web.reactive.function.server.RouterFunctions;
+import org.springframework.web.reactive.function.server.ServerResponse;
+
+@Component
+public class Router4 {
+
+ @Bean
+ public RouterFunction routeRequest4(Handler4 handler) {
+ return RouterFunctions.route(RequestPredicates.GET("/api/endpoint4")
+ .and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), handler::handleRequest4);
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router5.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router5.java
new file mode 100644
index 0000000000..c68e04659f
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router5.java
@@ -0,0 +1,21 @@
+package com.baeldung.reactive.errorhandling.routers;
+
+import com.baeldung.reactive.errorhandling.handlers.Handler5;
+import org.springframework.context.annotation.Bean;
+import org.springframework.http.MediaType;
+import org.springframework.stereotype.Component;
+import org.springframework.web.reactive.function.server.RequestPredicates;
+import org.springframework.web.reactive.function.server.RouterFunction;
+import org.springframework.web.reactive.function.server.RouterFunctions;
+import org.springframework.web.reactive.function.server.ServerResponse;
+
+@Component
+public class Router5 {
+
+ @Bean
+ public RouterFunction routeRequest5(Handler5 handler) {
+ return RouterFunctions.route(RequestPredicates.GET("/api/endpoint5")
+ .and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), handler::handleRequest5);
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/security/GreetController.java b/spring-reactive/src/main/java/com/baeldung/reactive/security/GreetController.java
new file mode 100644
index 0000000000..99b79d88ea
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/security/GreetController.java
@@ -0,0 +1,37 @@
+package com.baeldung.reactive.security;
+
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
+
+import java.security.Principal;
+
+@RestController
+public class GreetController {
+
+ private GreetService greetService;
+
+ public GreetController(GreetService greetService) {
+ this.greetService = greetService;
+ }
+
+ @GetMapping("/")
+ public Mono greet(Mono principal) {
+ return principal
+ .map(Principal::getName)
+ .map(name -> String.format("Hello, %s", name));
+ }
+
+ @GetMapping("/admin")
+ public Mono greetAdmin(Mono principal) {
+ return principal
+ .map(Principal::getName)
+ .map(name -> String.format("Admin access: %s", name));
+ }
+
+ @GetMapping("/greetService")
+ public Mono greetService() {
+ return greetService.greet();
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/security/GreetService.java b/spring-reactive/src/main/java/com/baeldung/reactive/security/GreetService.java
new file mode 100644
index 0000000000..93df64bced
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/security/GreetService.java
@@ -0,0 +1,15 @@
+package com.baeldung.reactive.security;
+
+import org.springframework.security.access.prepost.PreAuthorize;
+import org.springframework.stereotype.Service;
+import reactor.core.publisher.Mono;
+
+@Service
+public class GreetService {
+
+ @PreAuthorize("hasRole('ADMIN')")
+ public Mono greet() {
+ return Mono.just("Hello from service!");
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/security/SecurityConfig.java b/spring-reactive/src/main/java/com/baeldung/reactive/security/SecurityConfig.java
new file mode 100644
index 0000000000..bb2f2d50e1
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/security/SecurityConfig.java
@@ -0,0 +1,55 @@
+package com.baeldung.reactive.security;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.security.config.annotation.method.configuration.EnableReactiveMethodSecurity;
+import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
+import org.springframework.security.config.web.server.ServerHttpSecurity;
+import org.springframework.security.core.userdetails.MapReactiveUserDetailsService;
+import org.springframework.security.core.userdetails.User;
+import org.springframework.security.core.userdetails.UserDetails;
+import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
+import org.springframework.security.crypto.password.PasswordEncoder;
+import org.springframework.security.web.server.SecurityWebFilterChain;
+
+@EnableWebFluxSecurity
+@EnableReactiveMethodSecurity
+public class SecurityConfig {
+
+ @Bean
+ public SecurityWebFilterChain securitygWebFilterChain(ServerHttpSecurity http) {
+ return http.authorizeExchange()
+ .pathMatchers("/admin")
+ .hasAuthority("ROLE_ADMIN")
+ .anyExchange()
+ .authenticated()
+ .and()
+ .formLogin()
+ .and()
+ .csrf()
+ .disable()
+ .build();
+ }
+
+ @Bean
+ public MapReactiveUserDetailsService userDetailsService() {
+ UserDetails user = User
+ .withUsername("user")
+ .password(passwordEncoder().encode("password"))
+ .roles("USER")
+ .build();
+
+ UserDetails admin = User
+ .withUsername("admin")
+ .password(passwordEncoder().encode("password"))
+ .roles("ADMIN")
+ .build();
+
+ return new MapReactiveUserDetailsService(user, admin);
+ }
+
+ @Bean
+ public PasswordEncoder passwordEncoder() {
+ return new BCryptPasswordEncoder();
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/security/SpringSecurity5Application.java b/spring-reactive/src/main/java/com/baeldung/reactive/security/SpringSecurity5Application.java
new file mode 100644
index 0000000000..bc0895a38b
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/security/SpringSecurity5Application.java
@@ -0,0 +1,34 @@
+package com.baeldung.reactive.security;
+
+import org.springframework.context.ApplicationContext;
+import org.springframework.context.annotation.AnnotationConfigApplicationContext;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.ComponentScan;
+import org.springframework.http.server.reactive.HttpHandler;
+import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter;
+import org.springframework.web.reactive.config.EnableWebFlux;
+import org.springframework.web.server.adapter.WebHttpHandlerBuilder;
+import reactor.netty.DisposableServer;
+import reactor.netty.http.server.HttpServer;
+
+@ComponentScan(basePackages = {"com.baeldung.reactive.security"})
+@EnableWebFlux
+public class SpringSecurity5Application {
+
+ public static void main(String[] args) {
+ try (AnnotationConfigApplicationContext context =
+ new AnnotationConfigApplicationContext(SpringSecurity5Application.class)) {
+ context.getBean(DisposableServer.class).onDispose().block();
+ }
+ }
+
+ @Bean
+ public DisposableServer disposableServer(ApplicationContext context) {
+ HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context)
+ .build();
+ ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler);
+ HttpServer httpServer = HttpServer.create().host("localhost").port(8083);
+ return httpServer.handle(adapter).bindNow();
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webclient/Foo.java b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/Foo.java
new file mode 100644
index 0000000000..a58d672686
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/Foo.java
@@ -0,0 +1,24 @@
+package com.baeldung.reactive.webclient;
+
+public class Foo {
+
+ private String name;
+
+ public Foo() {
+ super();
+ }
+
+ public Foo(String name) {
+ super();
+ this.name = name;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webclient/Tweet.java b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/Tweet.java
new file mode 100644
index 0000000000..4ef1aecabb
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/Tweet.java
@@ -0,0 +1,13 @@
+package com.baeldung.reactive.webclient;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@NoArgsConstructor
+@AllArgsConstructor
+public class Tweet {
+ private String text;
+ private String username;
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webclient/TweetsSlowServiceController.java b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/TweetsSlowServiceController.java
new file mode 100644
index 0000000000..42ff603b87
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/TweetsSlowServiceController.java
@@ -0,0 +1,20 @@
+package com.baeldung.reactive.webclient;
+
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RestController
+public class TweetsSlowServiceController {
+
+ @GetMapping("/slow-service-tweets")
+ private List getAllTweets() throws Exception {
+ Thread.sleep(2000L); // delay
+ return Arrays.asList(
+ new Tweet("RestTemplate rules", "@user1"),
+ new Tweet("WebClient is better", "@user2"),
+ new Tweet("OK, both are useful", "@user1"));
+ }
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebClientApplication.java b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebClientApplication.java
new file mode 100644
index 0000000000..d2a5b6a024
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebClientApplication.java
@@ -0,0 +1,14 @@
+package com.baeldung.reactive.webclient;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.autoconfigure.security.reactive.ReactiveSecurityAutoConfiguration;
+
+@SpringBootApplication(exclude = { ReactiveSecurityAutoConfiguration.class })
+public class WebClientApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(WebClientApplication.class, args);
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebClientController.java b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebClientController.java
new file mode 100644
index 0000000000..0a83b1a1b7
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebClientController.java
@@ -0,0 +1,41 @@
+package com.baeldung.reactive.webclient;
+
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestPart;
+import org.springframework.web.bind.annotation.ResponseStatus;
+import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Mono;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@RestController
+public class WebClientController {
+
+ @ResponseStatus(HttpStatus.OK)
+ @GetMapping("/resource")
+ public Map getResource() {
+ Map response = new HashMap<>();
+ response.put("field", "value");
+ return response;
+ }
+
+ @PostMapping("/resource")
+ public Mono postStringResource(@RequestBody Mono bodyString) {
+ return bodyString.map(body -> "processed-" + body);
+ }
+
+ @PostMapping("/resource-foo")
+ public Mono postFooResource(@RequestBody Mono bodyFoo) {
+ return bodyFoo.map(foo -> "processedFoo-" + foo.getName());
+ }
+
+ @PostMapping(value = "/resource-multipart", consumes = MediaType.MULTIPART_FORM_DATA_VALUE)
+ public String handleFormUpload(@RequestPart("key1") String value1, @RequestPart("key2") String value2) {
+ return "processed-" + value1 + '-' + value2;
+ }
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebController.java b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebController.java
new file mode 100644
index 0000000000..30b3ccc959
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebController.java
@@ -0,0 +1,60 @@
+package com.baeldung.reactive.webclient;
+
+import lombok.Setter;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.MediaType;
+import org.springframework.http.ResponseEntity;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RestController;
+import org.springframework.web.client.RestTemplate;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Flux;
+
+import java.util.List;
+
+@Slf4j
+@RestController
+public class WebController {
+
+ private static final int DEFAULT_PORT = 8080;
+
+ @Setter
+ private int serverPort = DEFAULT_PORT;
+
+ @GetMapping("/tweets-blocking")
+ public List getTweetsBlocking() {
+ log.info("Starting BLOCKING Controller!");
+ final String uri = getSlowServiceUri();
+
+ RestTemplate restTemplate = new RestTemplate();
+ ResponseEntity> response = restTemplate.exchange(
+ uri, HttpMethod.GET, null,
+ new ParameterizedTypeReference>(){});
+
+ List result = response.getBody();
+ result.forEach(tweet -> log.info(tweet.toString()));
+ log.info("Exiting BLOCKING Controller!");
+ return result;
+ }
+
+ @GetMapping(value = "/tweets-non-blocking", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
+ public Flux getTweetsNonBlocking() {
+ log.info("Starting NON-BLOCKING Controller!");
+ Flux tweetFlux = WebClient.create()
+ .get()
+ .uri(getSlowServiceUri())
+ .retrieve()
+ .bodyToFlux(Tweet.class);
+
+ tweetFlux.subscribe(tweet -> log.info(tweet.toString()));
+ log.info("Exiting NON-BLOCKING Controller!");
+ return tweetFlux;
+ }
+
+ private String getSlowServiceUri() {
+ return "http://localhost:" + serverPort + "/slow-service-tweets";
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webclientrequests/SpringWebClientRequestsApp.java b/spring-reactive/src/main/java/com/baeldung/reactive/webclientrequests/SpringWebClientRequestsApp.java
new file mode 100644
index 0000000000..d43fe43a77
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/webclientrequests/SpringWebClientRequestsApp.java
@@ -0,0 +1,12 @@
+package com.baeldung.reactive.webclientrequests;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class SpringWebClientRequestsApp {
+
+ public static void main(String[] args) {
+ SpringApplication.run(SpringWebClientRequestsApp.class, args);
+ }
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webflux/Employee.java b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/Employee.java
new file mode 100644
index 0000000000..d41a4f2791
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/Employee.java
@@ -0,0 +1,17 @@
+package com.baeldung.reactive.webflux;
+
+import lombok.AllArgsConstructor;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+
+@Data
+@AllArgsConstructor
+@NoArgsConstructor
+public class Employee {
+
+ private String id;
+ private String name;
+
+ // standard getters and setters
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webflux/EmployeeRepository.java b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/EmployeeRepository.java
new file mode 100644
index 0000000000..8a57f5c510
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/EmployeeRepository.java
@@ -0,0 +1,59 @@
+package com.baeldung.reactive.webflux;
+
+import com.baeldung.reactive.webflux.Employee;
+import org.springframework.stereotype.Repository;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+import java.util.HashMap;
+import java.util.Map;
+
+@Repository
+public class EmployeeRepository {
+
+ static Map employeeData;
+
+ static Map employeeAccessData;
+
+ static {
+ employeeData = new HashMap<>();
+ employeeData.put("1", new Employee("1", "Employee 1"));
+ employeeData.put("2", new Employee("2", "Employee 2"));
+ employeeData.put("3", new Employee("3", "Employee 3"));
+ employeeData.put("4", new Employee("4", "Employee 4"));
+ employeeData.put("5", new Employee("5", "Employee 5"));
+ employeeData.put("6", new Employee("6", "Employee 6"));
+ employeeData.put("7", new Employee("7", "Employee 7"));
+ employeeData.put("8", new Employee("8", "Employee 8"));
+ employeeData.put("9", new Employee("9", "Employee 9"));
+ employeeData.put("10", new Employee("10", "Employee 10"));
+
+ employeeAccessData = new HashMap<>();
+ employeeAccessData.put("1", "Employee 1 Access Key");
+ employeeAccessData.put("2", "Employee 2 Access Key");
+ employeeAccessData.put("3", "Employee 3 Access Key");
+ employeeAccessData.put("4", "Employee 4 Access Key");
+ employeeAccessData.put("5", "Employee 5 Access Key");
+ employeeAccessData.put("6", "Employee 6 Access Key");
+ employeeAccessData.put("7", "Employee 7 Access Key");
+ employeeAccessData.put("8", "Employee 8 Access Key");
+ employeeAccessData.put("9", "Employee 9 Access Key");
+ employeeAccessData.put("10", "Employee 10 Access Key");
+ }
+
+ public Mono findEmployeeById(String id) {
+ return Mono.just(employeeData.get(id));
+ }
+
+ public Flux findAllEmployees() {
+ return Flux.fromIterable(employeeData.values());
+ }
+
+ public Mono updateEmployee(Employee employee) {
+ Employee existingEmployee = employeeData.get(employee.getId());
+ if (existingEmployee != null) {
+ existingEmployee.setName(employee.getName());
+ }
+ return Mono.just(existingEmployee);
+ }
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeController.java b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeController.java
new file mode 100644
index 0000000000..23aacfdd95
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeController.java
@@ -0,0 +1,39 @@
+package com.baeldung.reactive.webflux.annotation;
+
+import com.baeldung.reactive.webflux.Employee;
+import com.baeldung.reactive.webflux.EmployeeRepository;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+@RestController
+@RequestMapping("/employees")
+public class EmployeeController {
+
+ private final EmployeeRepository employeeRepository;
+
+ public EmployeeController(EmployeeRepository employeeRepository) {
+ this.employeeRepository = employeeRepository;
+ }
+
+ @GetMapping("/{id}")
+ private Mono getEmployeeById(@PathVariable String id) {
+ return employeeRepository.findEmployeeById(id);
+ }
+
+ @GetMapping
+ private Flux getAllEmployees() {
+ return employeeRepository.findAllEmployees();
+ }
+
+ @PostMapping("/update")
+ private Mono updateEmployee(@RequestBody Employee employee) {
+ return employeeRepository.updateEmployee(employee);
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeSpringApplication.java b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeSpringApplication.java
new file mode 100644
index 0000000000..1a2ada96e9
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeSpringApplication.java
@@ -0,0 +1,16 @@
+package com.baeldung.reactive.webflux.annotation;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class EmployeeSpringApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(EmployeeSpringApplication.class, args);
+
+ EmployeeWebClient employeeWebClient = new EmployeeWebClient();
+ employeeWebClient.consume();
+ }
+
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeWebClient.java b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeWebClient.java
new file mode 100644
index 0000000000..611a261a1b
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeWebClient.java
@@ -0,0 +1,32 @@
+package com.baeldung.reactive.webflux.annotation;
+
+import com.baeldung.reactive.webflux.Employee;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.web.reactive.function.client.WebClient;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class EmployeeWebClient {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(EmployeeWebClient.class);
+
+ WebClient client = WebClient.create("http://localhost:8080");
+
+ public void consume() {
+
+ Mono employeeMono = client.get()
+ .uri("/employees/{id}", "1")
+ .retrieve()
+ .bodyToMono(Employee.class);
+
+ employeeMono.subscribe(employee -> LOGGER.debug("Employee: {}", employee));
+
+ Flux employeeFlux = client.get()
+ .uri("/employees")
+ .retrieve()
+ .bodyToFlux(Employee.class);
+
+ employeeFlux.subscribe(employee -> LOGGER.debug("Employee: {}", employee));
+ }
+}
\ No newline at end of file
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeWebSecurityConfig.java b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeWebSecurityConfig.java
new file mode 100644
index 0000000000..8dfa455ce3
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeWebSecurityConfig.java
@@ -0,0 +1,45 @@
+package com.baeldung.reactive.webflux.annotation;
+
+import org.springframework.context.annotation.Bean;
+import org.springframework.http.HttpMethod;
+import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity;
+import org.springframework.security.config.web.server.ServerHttpSecurity;
+import org.springframework.security.core.userdetails.MapReactiveUserDetailsService;
+import org.springframework.security.core.userdetails.User;
+import org.springframework.security.core.userdetails.UserDetails;
+import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder;
+import org.springframework.security.crypto.password.PasswordEncoder;
+import org.springframework.security.web.server.SecurityWebFilterChain;
+
+@EnableWebFluxSecurity
+public class EmployeeWebSecurityConfig {
+
+ @Bean
+ public MapReactiveUserDetailsService userDetailsService() {
+ UserDetails user = User
+ .withUsername("admin")
+ .password(passwordEncoder().encode("password"))
+ .roles("ADMIN")
+ .build();
+ return new MapReactiveUserDetailsService(user);
+ }
+
+ @Bean
+ public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
+ http.csrf()
+ .disable()
+ .authorizeExchange()
+ .pathMatchers(HttpMethod.POST, "/employees/update")
+ .hasRole("ADMIN")
+ .pathMatchers("/**")
+ .permitAll()
+ .and()
+ .httpBasic();
+ return http.build();
+ }
+
+ @Bean
+ public PasswordEncoder passwordEncoder() {
+ return new BCryptPasswordEncoder();
+ }
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webflux/functional/EmployeeFunctionalConfig.java b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/functional/EmployeeFunctionalConfig.java
new file mode 100644
index 0000000000..f97d40e4e7
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/functional/EmployeeFunctionalConfig.java
@@ -0,0 +1,74 @@
+package com.baeldung.reactive.webflux.functional;
+
+import com.baeldung.reactive.webflux.Employee;
+import com.baeldung.reactive.webflux.EmployeeRepository;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.security.config.web.server.ServerHttpSecurity;
+import org.springframework.security.web.server.SecurityWebFilterChain;
+import org.springframework.web.reactive.function.server.RouterFunction;
+import org.springframework.web.reactive.function.server.ServerResponse;
+
+import static org.springframework.web.reactive.function.BodyExtractors.toMono;
+import static org.springframework.web.reactive.function.server.RequestPredicates.GET;
+import static org.springframework.web.reactive.function.server.RequestPredicates.POST;
+import static org.springframework.web.reactive.function.server.RouterFunctions.route;
+import static org.springframework.web.reactive.function.server.ServerResponse.ok;
+
+@Configuration
+public class EmployeeFunctionalConfig {
+
+ @Bean
+ EmployeeRepository employeeRepository() {
+ return new EmployeeRepository();
+ }
+
+ @Bean
+ RouterFunction getAllEmployeesRoute() {
+ return route(GET("/employees"),
+ req -> ok().body(
+ employeeRepository().findAllEmployees(), Employee.class));
+ }
+
+ @Bean
+ RouterFunction getEmployeeByIdRoute() {
+ return route(GET("/employees/{id}"),
+ req -> ok().body(
+ employeeRepository().findEmployeeById(req.pathVariable("id")), Employee.class));
+ }
+
+ @Bean
+ RouterFunction updateEmployeeRoute() {
+ return route(POST("/employees/update"),
+ req -> req.body(toMono(Employee.class))
+ .doOnNext(employeeRepository()::updateEmployee)
+ .then(ok().build()));
+ }
+
+ @Bean
+ RouterFunction composedRoutes() {
+ return
+ route(GET("/employees"),
+ req -> ok().body(
+ employeeRepository().findAllEmployees(), Employee.class))
+
+ .and(route(GET("/employees/{id}"),
+ req -> ok().body(
+ employeeRepository().findEmployeeById(req.pathVariable("id")), Employee.class)))
+
+ .and(route(POST("/employees/update"),
+ req -> req.body(toMono(Employee.class))
+ .doOnNext(employeeRepository()::updateEmployee)
+ .then(ok().build())));
+ }
+
+ @Bean
+ public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) {
+ http.csrf()
+ .disable()
+ .authorizeExchange()
+ .anyExchange()
+ .permitAll();
+ return http.build();
+ }
+}
diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webflux/functional/EmployeeSpringFunctionalApplication.java b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/functional/EmployeeSpringFunctionalApplication.java
new file mode 100644
index 0000000000..f710cf44eb
--- /dev/null
+++ b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/functional/EmployeeSpringFunctionalApplication.java
@@ -0,0 +1,13 @@
+package com.baeldung.reactive.webflux.functional;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+
+@SpringBootApplication
+public class EmployeeSpringFunctionalApplication {
+
+ public static void main(String[] args) {
+ SpringApplication.run(EmployeeSpringFunctionalApplication.class, args);
+ }
+
+}
diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceIntegrationTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceIntegrationTest.java
new file mode 100644
index 0000000000..5cb1a69fbd
--- /dev/null
+++ b/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceIntegrationTest.java
@@ -0,0 +1,62 @@
+package com.baeldung.reactive.debugging.consumer;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.classic.spi.IThrowableProxy;
+import com.baeldung.reactive.debugging.consumer.model.Foo;
+import com.baeldung.reactive.debugging.consumer.service.FooService;
+import com.baeldung.reactive.debugging.consumer.utils.ListAppender;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Hooks;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ConsumerFooServiceIntegrationTest {
+
+ FooService service = new FooService();
+
+ @BeforeEach
+ public void clearLogList() {
+ Hooks.onOperatorDebug();
+ ListAppender.clearEventList();
+ }
+
+ @Test
+ public void givenFooWithNullId_whenProcessFoo_thenLogsWithDebugTrace() {
+ Foo one = new Foo(1, "nameverylong", 8);
+ Foo two = new Foo(null, "nameverylong", 4);
+ Flux flux = Flux.just(one, two);
+
+ service.processFoo(flux);
+
+ Collection allLoggedEntries = ListAppender.getEvents()
+ .stream()
+ .map(ILoggingEvent::getFormattedMessage)
+ .collect(Collectors.toList());
+
+ Collection allSuppressedEntries = ListAppender.getEvents()
+ .stream()
+ .map(ILoggingEvent::getThrowableProxy)
+ .flatMap(t -> {
+ return Optional.ofNullable(t)
+ .map(IThrowableProxy::getSuppressed)
+ .map(Arrays::stream)
+ .orElse(Stream.empty());
+ })
+ .map(IThrowableProxy::getClassName)
+ .collect(Collectors.toList());
+ assertThat(allLoggedEntries).anyMatch(entry -> entry.contains("The following error happened on processFoo method!"))
+ .anyMatch(entry -> entry.contains("| onSubscribe"))
+ .anyMatch(entry -> entry.contains("| cancel()"));
+
+ assertThat(allSuppressedEntries)
+ .anyMatch(entry -> entry.contains("reactor.core.publisher.FluxOnAssembly$OnAssemblyException"));
+ }
+}
diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceLiveTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceLiveTest.java
new file mode 100644
index 0000000000..c3ef67f534
--- /dev/null
+++ b/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceLiveTest.java
@@ -0,0 +1,53 @@
+package com.baeldung.reactive.debugging.consumer;
+
+import com.baeldung.reactive.debugging.consumer.service.FooService;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Test;
+import org.springframework.test.web.reactive.server.WebTestClient;
+import org.springframework.test.web.reactive.server.WebTestClient.ResponseSpec;
+
+/**
+ * In order to run this live test, start the following classes:
+ * - com.baeldung.debugging.server.ServerDebuggingApplication
+ * - com.baeldung.debugging.consumer.ConsumerDebuggingApplication
+ */
+public class ConsumerFooServiceLiveTest {
+
+ FooService service = new FooService();
+
+ private static final String BASE_URL = "http://localhost:8082";
+ private static final String DEBUG_HOOK_ON = BASE_URL + "/debug-hook-on";
+ private static final String DEBUG_HOOK_OFF = BASE_URL + "/debug-hook-off";
+
+ private static WebTestClient client;
+
+ @BeforeAll
+ public static void setup() {
+ client = WebTestClient.bindToServer()
+ .baseUrl(BASE_URL)
+ .build();
+ }
+
+ @Test
+ public void whenRequestingDebugHookOn_thenObtainExpectedMessage() {
+ ResponseSpec response = client.get()
+ .uri(DEBUG_HOOK_ON)
+ .exchange();
+ response.expectStatus()
+ .isOk()
+ .expectBody(String.class)
+ .isEqualTo("DEBUG HOOK ON");
+ }
+
+ @Test
+ public void whenRequestingDebugHookOff_thenObtainExpectedMessage() {
+ ResponseSpec response = client.get()
+ .uri(DEBUG_HOOK_OFF)
+ .exchange();
+ response.expectStatus()
+ .isOk()
+ .expectBody(String.class)
+ .isEqualTo("DEBUG HOOK OFF");
+ }
+
+}
diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/utils/ListAppender.java b/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/utils/ListAppender.java
new file mode 100644
index 0000000000..fe8b04e824
--- /dev/null
+++ b/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/utils/ListAppender.java
@@ -0,0 +1,25 @@
+package com.baeldung.reactive.debugging.consumer.utils;
+
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.AppenderBase;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class ListAppender extends AppenderBase {
+
+ static private List events = new ArrayList<>();
+
+ @Override
+ protected void append(ILoggingEvent eventObject) {
+ events.add(eventObject);
+ }
+
+ public static List getEvents() {
+ return events;
+ }
+
+ public static void clearEventList() {
+ events.clear();
+ }
+}
diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/introduction/ReactorIntegrationTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/introduction/ReactorIntegrationTest.java
new file mode 100644
index 0000000000..e60d2e9b94
--- /dev/null
+++ b/spring-reactive/src/test/java/com/baeldung/reactive/introduction/ReactorIntegrationTest.java
@@ -0,0 +1,123 @@
+package com.baeldung.reactive.introduction;
+
+import org.junit.Test;
+import org.reactivestreams.Subscriber;
+import org.reactivestreams.Subscription;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import reactor.core.publisher.ConnectableFlux;
+import reactor.core.publisher.Flux;
+import reactor.core.scheduler.Schedulers;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+public class ReactorIntegrationTest {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(ReactorIntegrationTest.class);
+
+ @Test
+ public void givenFlux_whenSubscribing_thenStream() {
+
+ List elements = new ArrayList<>();
+
+ Flux.just(1, 2, 3, 4)
+ .log()
+ .map(i -> {
+ LOGGER.debug("{}:{}", i, Thread.currentThread());
+ return i * 2;
+ })
+ .subscribe(elements::add);
+
+ assertThat(elements).containsExactly(2, 4, 6, 8);
+ }
+
+ @Test
+ public void givenFlux_whenZipping_thenCombine() {
+ List elements = new ArrayList<>();
+
+ Flux.just(1, 2, 3, 4)
+ .log()
+ .map(i -> i * 2)
+ .zipWith(Flux.range(0, Integer.MAX_VALUE).log(), (one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two))
+ .subscribe(elements::add);
+
+ assertThat(elements).containsExactly(
+ "First Flux: 2, Second Flux: 0",
+ "First Flux: 4, Second Flux: 1",
+ "First Flux: 6, Second Flux: 2",
+ "First Flux: 8, Second Flux: 3");
+ }
+
+ @Test
+ public void givenFlux_whenApplyingBackPressure_thenPushElementsInBatches() {
+
+ List elements = new ArrayList<>();
+
+ Flux.just(1, 2, 3, 4)
+ .log()
+ .subscribe(new Subscriber() {
+ private Subscription s;
+ int onNextAmount;
+
+ @Override
+ public void onSubscribe(final Subscription s) {
+ this.s = s;
+ s.request(2);
+ }
+
+ @Override
+ public void onNext(final Integer integer) {
+ elements.add(integer);
+ onNextAmount++;
+ if (onNextAmount % 2 == 0) {
+ s.request(2);
+ }
+ }
+
+ @Override
+ public void onError(final Throwable t) {
+ }
+
+ @Override
+ public void onComplete() {
+ }
+ });
+
+ assertThat(elements).containsExactly(1, 2, 3, 4);
+ }
+
+ @Test
+ public void givenFlux_whenInParallel_thenSubscribeInDifferentThreads() throws InterruptedException {
+ List threadNames = new ArrayList<>();
+
+ Flux.just(1, 2, 3, 4)
+ .log()
+ .map(i -> Thread.currentThread().getName())
+ .subscribeOn(Schedulers.parallel())
+ .subscribe(threadNames::add);
+
+ Thread.sleep(1000);
+
+ assertThat(threadNames).containsExactly("parallel-1", "parallel-1", "parallel-1", "parallel-1");
+ }
+
+ @Test
+ public void givenConnectableFlux_whenConnected_thenShouldStream() {
+
+ List elements = new ArrayList<>();
+
+ final ConnectableFlux publish = Flux.just(1, 2, 3, 4).publish();
+
+ publish.subscribe(elements::add);
+
+ assertThat(elements).isEmpty();
+
+ publish.connect();
+
+ assertThat(elements).containsExactly(1, 2, 3, 4);
+ }
+
+}
diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/security/SecurityIntegrationTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/security/SecurityIntegrationTest.java
new file mode 100644
index 0000000000..06644fbf77
--- /dev/null
+++ b/spring-reactive/src/test/java/com/baeldung/reactive/security/SecurityIntegrationTest.java
@@ -0,0 +1,37 @@
+package com.baeldung.reactive.security;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.ApplicationContext;
+import org.springframework.security.test.context.support.WithMockUser;
+import org.springframework.test.context.ContextConfiguration;
+import org.springframework.test.context.junit.jupiter.SpringExtension;
+import org.springframework.test.web.reactive.server.WebTestClient;
+
+@ExtendWith(SpringExtension.class)
+@ContextConfiguration(classes = SpringSecurity5Application.class)
+public class SecurityIntegrationTest {
+
+ @Autowired
+ ApplicationContext context;
+
+ private WebTestClient rest;
+
+ @BeforeEach
+ public void setup() {
+ this.rest = WebTestClient.bindToApplicationContext(this.context).configureClient().build();
+ }
+
+ @Test
+ public void whenNoCredentials_thenRedirectToLogin() {
+ this.rest.get().uri("/").exchange().expectStatus().is3xxRedirection();
+ }
+
+ @Test
+ @WithMockUser
+ public void whenHasCredentials_thenSeesGreeting() {
+ this.rest.get().uri("/").exchange().expectStatus().isOk().expectBody(String.class).isEqualTo("Hello, user");
+ }
+}
diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/webclient/SpringContextTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/webclient/SpringContextTest.java
new file mode 100644
index 0000000000..4a1fc4390a
--- /dev/null
+++ b/spring-reactive/src/test/java/com/baeldung/reactive/webclient/SpringContextTest.java
@@ -0,0 +1,12 @@
+package com.baeldung.reactive.webclient;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest(classes = WebClientApplication.class)
+public class SpringContextTest {
+
+ @Test
+ public void whenSpringContextIsBootstrapped_thenNoExceptions() {
+ }
+}
diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/webclient/WebClientIntegrationTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/webclient/WebClientIntegrationTest.java
new file mode 100644
index 0000000000..28b4c19a10
--- /dev/null
+++ b/spring-reactive/src/test/java/com/baeldung/reactive/webclient/WebClientIntegrationTest.java
@@ -0,0 +1,325 @@
+package com.baeldung.reactive.webclient;
+
+import io.netty.channel.ChannelOption;
+import io.netty.handler.timeout.ReadTimeoutException;
+import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.handler.timeout.WriteTimeoutHandler;
+import org.junit.jupiter.api.Test;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.boot.test.context.SpringBootTest.WebEnvironment;
+import org.springframework.boot.web.server.LocalServerPort;
+import org.springframework.core.ParameterizedTypeReference;
+import org.springframework.core.codec.CodecException;
+import org.springframework.http.HttpHeaders;
+import org.springframework.http.HttpMethod;
+import org.springframework.http.HttpStatus;
+import org.springframework.http.MediaType;
+import org.springframework.http.ReactiveHttpOutputMessage;
+import org.springframework.http.client.reactive.ClientHttpRequest;
+import org.springframework.http.client.reactive.ReactorClientHttpConnector;
+import org.springframework.util.LinkedMultiValueMap;
+import org.springframework.util.MultiValueMap;
+import org.springframework.web.reactive.function.BodyInserter;
+import org.springframework.web.reactive.function.BodyInserters;
+import org.springframework.web.reactive.function.client.WebClient;
+import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec;
+import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec;
+import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec;
+import org.springframework.web.reactive.function.client.WebClient.RequestHeadersUriSpec;
+import org.springframework.web.reactive.function.client.WebClient.ResponseSpec;
+import org.springframework.web.reactive.function.client.WebClientRequestException;
+import reactor.core.publisher.Mono;
+import reactor.netty.http.client.HttpClient;
+import reactor.test.StepVerifier;
+
+import java.net.URI;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.ZonedDateTime;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+@SpringBootTest(classes = WebClientApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT)
+public class WebClientIntegrationTest {
+
+ @LocalServerPort
+ private int port;
+
+ private static final String BODY_VALUE = "bodyValue";
+ private static final ParameterizedTypeReference