diff --git a/pom.xml b/pom.xml index ad002650d7..3770c05b49 100644 --- a/pom.xml +++ b/pom.xml @@ -619,6 +619,7 @@ spring-5-reactive-security spring-5-webflux spring-5-webflux-2 + spring-reactive spring-activiti spring-akka @@ -1089,6 +1090,7 @@ spring-5-reactive-oauth spring-5-reactive-security spring-5-webflux + spring-reactive spring-activiti spring-akka diff --git a/spring-reactive/README.md b/spring-reactive/README.md new file mode 100644 index 0000000000..6e887fe2f8 --- /dev/null +++ b/spring-reactive/README.md @@ -0,0 +1,17 @@ +## Spring Reactive + +This module contains articles about Spring Reactor. + +## Relevant articles: + +- [Introduction to Project Reactor Bus](https://www.baeldung.com/reactor-bus) +- [Intro To Reactor Core](https://www.baeldung.com/reactor-core) +- [Debugging Reactive Streams in Java](https://www.baeldung.com/spring-debugging-reactive-streams) +- [Guide to Spring 5 WebFlux](https://www.baeldung.com/spring-webflux) +- [Introduction to the Functional Web Framework in Spring 5](https://www.baeldung.com/spring-5-functional-web) +- [Spring 5 WebClient](https://www.baeldung.com/spring-5-webclient) +- [Spring WebClient vs. RestTemplate](https://www.baeldung.com/spring-webclient-resttemplate) +- [https://www.baeldung.com/webflux-webclient-parameters](https://www.baeldung.com/webflux-webclient-parameters) +- [Handling Errors in Spring WebFlux](https://www.baeldung.com/spring-webflux-errors) +- [Spring Security 5 for Reactive Applications](https://www.baeldung.com/spring-security-5-reactive) +- [https://www.baeldung.com/spring-webflux-concurrency](https://www.baeldung.com/spring-webflux-concurrency) \ No newline at end of file diff --git a/spring-reactive/pom.xml b/spring-reactive/pom.xml new file mode 100644 index 0000000000..a4d375ea15 --- /dev/null +++ b/spring-reactive/pom.xml @@ -0,0 +1,66 @@ + + + 4.0.0 + + + com.baeldung + parent-boot-2 + 0.0.1-SNAPSHOT + ../parent-boot-2 + + + spring-reactive + + + + org.springframework.boot + spring-boot-starter-webflux + + + org.springframework.boot + spring-boot-starter-validation + + + org.springframework.boot + spring-boot-starter-security + + + io.reactivex.rxjava2 + rxjava + ${rxjava.version} + + + io.projectreactor.kafka + reactor-kafka + ${reactor-kafka.version} + + + org.springframework.boot + spring-boot-starter-data-mongodb-reactive + + + org.springframework.security + spring-security-test + test + + + io.projectreactor + reactor-test + ${reactor.version} + test + + + org.projectlombok + lombok + + + + + 3.4.12 + 1.2.2.RELEASE + 2.2.19 + + + \ No newline at end of file diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Application.java b/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Application.java new file mode 100644 index 0000000000..f34d930ef0 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Application.java @@ -0,0 +1,17 @@ +package com.baeldung.reactive.concurrency; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +/** +* Please note we assume Mongo and Kafka are running in the local machine and on default configuration. +* Additionally, if you want to experiment with Tomcat/Jetty instead of Netty, just uncomment the lines in pom.xml and rebuild. +*/ +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Controller.java b/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Controller.java new file mode 100644 index 0000000000..70928d4dca --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Controller.java @@ -0,0 +1,127 @@ +package com.baeldung.reactive.concurrency; + +import io.reactivex.Observable; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.IntegerDeserializer; +import org.apache.kafka.common.serialization.IntegerSerializer; +import org.apache.kafka.common.serialization.StringDeserializer; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; +import reactor.core.scheduler.Scheduler; +import reactor.core.scheduler.Schedulers; +import reactor.kafka.receiver.KafkaReceiver; +import reactor.kafka.receiver.ReceiverOptions; +import reactor.kafka.receiver.ReceiverRecord; +import reactor.kafka.sender.KafkaSender; +import reactor.kafka.sender.SenderOptions; +import reactor.kafka.sender.SenderRecord; + +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +@RestController +@RequestMapping("/") +public class Controller { + + @Autowired + private PersonRepository personRepository; + + private Scheduler scheduler = Schedulers.newBoundedElastic(5, 10, "MyThreadGroup"); + + private Logger logger = LoggerFactory.getLogger(Controller.class); + + @GetMapping("/threads/webflux") + public Flux getThreadsWebflux() { + return Flux.fromIterable(getThreads()); + } + + @GetMapping("/threads/webclient") + public Flux getThreadsWebClient() { + WebClient.create("http://localhost:8080/index") + .get() + .retrieve() + .bodyToMono(String.class) + .subscribeOn(scheduler) + .publishOn(scheduler) + .doOnNext(s -> logger.info("Response: {}", s)) + .subscribe(); + return Flux.fromIterable(getThreads()); + } + + @GetMapping("/threads/rxjava") + public Observable getIndexRxJava() { + Observable.fromIterable(Arrays.asList("Hello", "World")) + .map(s -> s.toUpperCase()) + .observeOn(io.reactivex.schedulers.Schedulers.trampoline()) + .doOnNext(s -> logger.info("String: {}", s)) + .subscribe(); + return Observable.fromIterable(getThreads()); + } + + @GetMapping("/threads/mongodb") + public Flux getIndexMongo() { + personRepository.findAll() + .doOnNext(p -> logger.info("Person: {}", p)) + .subscribe(); + return Flux.fromIterable(getThreads()); + } + + @GetMapping("/threads/reactor-kafka") + public Flux getIndexKafka() { + Map producerProps = new HashMap<>(); + producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class); + producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); + SenderOptions senderOptions = SenderOptions.create(producerProps); + KafkaSender sender = KafkaSender.create(senderOptions); + Flux> outboundFlux = Flux.range(1, 10) + .map(i -> SenderRecord.create(new ProducerRecord<>("reactive-test", i, "Message_" + i), i)); + sender.send(outboundFlux) + .subscribe(); + + Map consumerProps = new HashMap<>(); + consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); + consumerProps.put(ConsumerConfig.CLIENT_ID_CONFIG, "my-consumer"); + consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "my-group"); + consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class); + consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); + consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); + ReceiverOptions receiverOptions = ReceiverOptions.create(consumerProps); + receiverOptions.subscription(Collections.singleton("reactive-test")); + KafkaReceiver receiver = KafkaReceiver.create(receiverOptions); + Flux> inboundFlux = receiver.receive(); + inboundFlux.subscribe(r -> { + logger.info("Received message: {}", r.value()); + r.receiverOffset() + .acknowledge(); + }); + return Flux.fromIterable(getThreads()); + } + + @GetMapping("/index") + public Mono getIndex() { + return Mono.just("Hello world!"); + } + + private List getThreads() { + return Thread.getAllStackTraces() + .keySet() + .stream() + .map(t -> String.format("%-20s \t %s \t %d \t %s\n", t.getName(), t.getState(), t.getPriority(), t.isDaemon() ? "Daemon" : "Normal")) + .collect(Collectors.toList()); + } +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Person.java b/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Person.java new file mode 100644 index 0000000000..10029330af --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/Person.java @@ -0,0 +1,27 @@ +package com.baeldung.reactive.concurrency; + +import org.springframework.data.annotation.Id; +import org.springframework.data.mongodb.core.mapping.Document; + +@Document +public class Person { + @Id + String id; + + public Person(String id) { + this.id = id; + } + + public String getId() { + return id; + } + + public void setId(String id) { + this.id = id; + } + + @Override + public String toString() { + return "Person{" + "id='" + id + '\'' + '}'; + } +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/PersonRepository.java b/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/PersonRepository.java new file mode 100644 index 0000000000..221ea3d74d --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/concurrency/PersonRepository.java @@ -0,0 +1,6 @@ +package com.baeldung.reactive.concurrency; + +import org.springframework.data.mongodb.repository.ReactiveMongoRepository; + +public interface PersonRepository extends ReactiveMongoRepository { +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/ConsumerDebuggingApplication.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/ConsumerDebuggingApplication.java new file mode 100644 index 0000000000..fa10383c95 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/ConsumerDebuggingApplication.java @@ -0,0 +1,33 @@ +package com.baeldung.reactive.debugging.consumer; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.mongo.MongoReactiveAutoConfiguration; +import org.springframework.context.annotation.Bean; +import org.springframework.scheduling.annotation.EnableScheduling; +import org.springframework.security.config.web.server.ServerHttpSecurity; +import org.springframework.security.web.server.SecurityWebFilterChain; +import reactor.core.publisher.Hooks; + +import java.util.Collections; + +@SpringBootApplication(exclude = MongoReactiveAutoConfiguration.class) +@EnableScheduling +public class ConsumerDebuggingApplication { + + public static void main(String[] args) { + Hooks.onOperatorDebug(); + SpringApplication app = new SpringApplication(ConsumerDebuggingApplication.class); + app.setDefaultProperties(Collections.singletonMap("server.port", "8082")); + app.run(args); + } + + @Bean + public SecurityWebFilterChain debuggingConsumerSpringSecurityFilterChain(ServerHttpSecurity http) { + http.authorizeExchange() + .anyExchange() + .permitAll(); + http.csrf().disable(); + return http.build(); + } +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/chronjobs/ChronJobs.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/chronjobs/ChronJobs.java new file mode 100644 index 0000000000..b58648ff9d --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/chronjobs/ChronJobs.java @@ -0,0 +1,152 @@ +package com.baeldung.reactive.debugging.consumer.chronjobs; + +import com.baeldung.reactive.debugging.consumer.model.Foo; +import com.baeldung.reactive.debugging.consumer.model.FooDto; +import com.baeldung.reactive.debugging.consumer.service.FooService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.scheduling.annotation.Scheduled; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; + +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; + +@Component +public class ChronJobs { + + private static Logger logger = LoggerFactory.getLogger(ChronJobs.class); + private WebClient client = WebClient.create("http://localhost:8081"); + + @Autowired + private FooService service; + + @Scheduled(fixedRate = 10000) + public void consumeInfiniteFlux() { + Flux fluxFoo = client.get() + .uri("/functional-reactive/periodic-foo") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(FooDto.class) + .delayElements(Duration.ofMillis(100)) + .map(dto -> { + logger.debug("process 1 with dto id {} name{}", dto.getId(), dto.getName()); + return new Foo(dto); + }); + Integer random = ThreadLocalRandom.current() + .nextInt(0, 3); + switch (random) { + case 0: + logger.info("process 1 with approach 1"); + service.processFoo(fluxFoo); + break; + case 1: + logger.info("process 1 with approach 1 EH"); + service.processUsingApproachOneWithErrorHandling(fluxFoo); + break; + default: + logger.info("process 1 with approach 2"); + service.processFooInAnotherScenario(fluxFoo); + break; + + } + } + + @Scheduled(fixedRate = 20000) + public void consumeFiniteFlux2() { + Flux fluxFoo = client.get() + .uri("/functional-reactive/periodic-foo-2") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(FooDto.class) + .delayElements(Duration.ofMillis(100)) + .map(dto -> { + logger.debug("process 2 with dto id {} name{}", dto.getId(), dto.getName()); + return new Foo(dto); + }); + Integer random = ThreadLocalRandom.current() + .nextInt(0, 3); + switch (random) { + case 0: + logger.info("process 2 with approach 1"); + service.processFoo(fluxFoo); + break; + case 1: + logger.info("process 2 with approach 1 EH"); + service.processUsingApproachOneWithErrorHandling(fluxFoo); + break; + default: + logger.info("process 2 with approach 2"); + service.processFooInAnotherScenario(fluxFoo); + break; + + } + } + + @Scheduled(fixedRate = 20000) + public void consumeFiniteFlux3() { + Flux fluxFoo = client.get() + .uri("/functional-reactive/periodic-foo-2") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(FooDto.class) + .delayElements(Duration.ofMillis(100)) + .map(dto -> { + logger.debug("process 3 with dto id {} name{}", dto.getId(), dto.getName()); + return new Foo(dto); + }); + logger.info("process 3 with approach 3"); + service.processUsingApproachThree(fluxFoo); + } + + @Scheduled(fixedRate = 20000) + public void consumeFiniteFluxWithCheckpoint4() { + Flux fluxFoo = client.get() + .uri("/functional-reactive/periodic-foo-2") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(FooDto.class) + .delayElements(Duration.ofMillis(100)) + .map(dto -> { + logger.debug("process 4 with dto id {} name{}", dto.getId(), dto.getName()); + return new Foo(dto); + }); + logger.info("process 4 with approach 4"); + service.processUsingApproachFourWithCheckpoint(fluxFoo); + } + + @Scheduled(fixedRate = 20000) + public void consumeFiniteFluxWitParallelScheduler() { + Flux fluxFoo = client.get() + .uri("/functional-reactive/periodic-foo-2") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(FooDto.class) + .delayElements(Duration.ofMillis(100)) + .map(dto -> { + logger.debug("process 5-parallel with dto id {} name{}", dto.getId(), dto.getName()); + return new Foo(dto); + }); + logger.info("process 5-parallel with approach 5-parallel"); + service.processUsingApproachFivePublishingToDifferentParallelThreads(fluxFoo); + } + + @Scheduled(fixedRate = 20000) + public void consumeFiniteFluxWithSingleSchedulers() { + Flux fluxFoo = client.get() + .uri("/functional-reactive/periodic-foo-2") + .accept(MediaType.TEXT_EVENT_STREAM) + .retrieve() + .bodyToFlux(FooDto.class) + .delayElements(Duration.ofMillis(100)) + .map(dto -> { + logger.debug("process 5-single with dto id {} name{}", dto.getId(), dto.getName()); + return new Foo(dto); + }); + logger.info("process 5-single with approach 5-single"); + service.processUsingApproachFivePublishingToDifferentSingleThreads(fluxFoo); + } +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/controllers/ReactiveConfigsToggleRestController.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/controllers/ReactiveConfigsToggleRestController.java new file mode 100644 index 0000000000..df13113a82 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/controllers/ReactiveConfigsToggleRestController.java @@ -0,0 +1,22 @@ +package com.baeldung.reactive.debugging.consumer.controllers; + +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Hooks; + +@RestController +public class ReactiveConfigsToggleRestController { + + @GetMapping("/debug-hook-on") + public String setReactiveDebugOn() { + Hooks.onOperatorDebug(); + return "DEBUG HOOK ON"; + } + + @GetMapping("/debug-hook-off") + public String setReactiveDebugOff() { + Hooks.resetOnOperatorDebug(); + return "DEBUG HOOK OFF"; + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/Foo.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/Foo.java new file mode 100644 index 0000000000..d20e2c9ba0 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/Foo.java @@ -0,0 +1,27 @@ +package com.baeldung.reactive.debugging.consumer.model; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +import java.util.concurrent.ThreadLocalRandom; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class Foo { + + private Integer id; + private String formattedName; + private Integer quantity; + + public Foo(FooDto dto) { + this.id = (ThreadLocalRandom.current() + .nextInt(0, 100) == 0) ? null : dto.getId(); + this.formattedName = dto.getName(); + this.quantity = ThreadLocalRandom.current() + .nextInt(0, 10); + } +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/FooDto.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/FooDto.java new file mode 100644 index 0000000000..bf6f614e18 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/model/FooDto.java @@ -0,0 +1,16 @@ +package com.baeldung.reactive.debugging.consumer.model; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; + +@Getter +@Setter +@NoArgsConstructor +@AllArgsConstructor +public class FooDto { + + private Integer id; + private String name; +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooNameHelper.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooNameHelper.java new file mode 100644 index 0000000000..cdd9ca31a6 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooNameHelper.java @@ -0,0 +1,44 @@ +package com.baeldung.reactive.debugging.consumer.service; + +import com.baeldung.reactive.debugging.consumer.model.Foo; +import reactor.core.publisher.Flux; + +import java.util.concurrent.ThreadLocalRandom; + +public class FooNameHelper { + + public static Flux concatAndSubstringFooName(Flux flux) { + flux = concatFooName(flux); + flux = substringFooName(flux); + return flux; + } + + public static Flux concatFooName(Flux flux) { + flux = flux.map(foo -> { + String processedName = null; + Integer random = ThreadLocalRandom.current() + .nextInt(0, 80); + processedName = (random != 0) ? foo.getFormattedName() : foo.getFormattedName() + "-bael"; + foo.setFormattedName(processedName); + return foo; + }); + return flux; + } + + public static Flux substringFooName(Flux flux) { + return flux.map(foo -> { + String processedName; + Integer random = ThreadLocalRandom.current() + .nextInt(0, 100); + + processedName = (random == 0) ? foo.getFormattedName() + .substring(10, 15) + : foo.getFormattedName() + .substring(0, 5); + + foo.setFormattedName(processedName); + return foo; + }); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooQuantityHelper.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooQuantityHelper.java new file mode 100644 index 0000000000..f4600b41b9 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooQuantityHelper.java @@ -0,0 +1,30 @@ +package com.baeldung.reactive.debugging.consumer.service; + +import com.baeldung.reactive.debugging.consumer.model.Foo; +import reactor.core.publisher.Flux; + +import java.util.concurrent.ThreadLocalRandom; + +public class FooQuantityHelper { + + public static Flux processFooReducingQuantity(Flux flux) { + flux = flux.map(foo -> { + Integer result; + Integer random = ThreadLocalRandom.current() + .nextInt(0, 90); + result = (random == 0) ? result = 0 : foo.getQuantity() + 2; + foo.setQuantity(result); + return foo; + }); + return divideFooQuantity(flux); + } + + public static Flux divideFooQuantity(Flux flux) { + return flux.map(foo -> { + Integer result = Math.round(5 / foo.getQuantity()); + foo.setQuantity(result); + return foo; + }); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooReporter.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooReporter.java new file mode 100644 index 0000000000..1a8f9bc783 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooReporter.java @@ -0,0 +1,24 @@ +package com.baeldung.reactive.debugging.consumer.service; + +import com.baeldung.reactive.debugging.consumer.model.Foo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + +public class FooReporter { + + private static Logger logger = LoggerFactory.getLogger(FooReporter.class); + + public static Flux reportResult(Flux input, String approach) { + return input.map(foo -> { + if (foo.getId() == null) + throw new IllegalArgumentException("Null id is not valid!"); + logger.info("Reporting for approach {}: Foo with id '{}' name '{}' and quantity '{}'", approach, foo.getId(), foo.getFormattedName(), foo.getQuantity()); + return foo; + }); + } + + public static Flux reportResult(Flux input) { + return reportResult(input, "default"); + } +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooService.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooService.java new file mode 100644 index 0000000000..bafaa3cfa0 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/consumer/service/FooService.java @@ -0,0 +1,118 @@ +package com.baeldung.reactive.debugging.consumer.service; + +import com.baeldung.reactive.debugging.consumer.model.Foo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.stereotype.Component; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; + +import static com.baeldung.reactive.debugging.consumer.service.FooNameHelper.concatAndSubstringFooName; +import static com.baeldung.reactive.debugging.consumer.service.FooNameHelper.substringFooName; +import static com.baeldung.reactive.debugging.consumer.service.FooQuantityHelper.divideFooQuantity; +import static com.baeldung.reactive.debugging.consumer.service.FooQuantityHelper.processFooReducingQuantity; +import static com.baeldung.reactive.debugging.consumer.service.FooReporter.reportResult; + +@Component +public class FooService { + + private static Logger logger = LoggerFactory.getLogger(FooService.class); + + public void processFoo(Flux flux) { + flux = FooNameHelper.concatFooName(flux); + flux = FooNameHelper.substringFooName(flux); + flux = flux.log(); + flux = FooReporter.reportResult(flux); + flux = flux.doOnError(error -> { + logger.error("The following error happened on processFoo method!", error); + }); + flux.subscribe(); + } + + public void processFooInAnotherScenario(Flux flux) { + flux = FooNameHelper.substringFooName(flux); + flux = FooQuantityHelper.divideFooQuantity(flux); + flux.subscribe(); + } + + public void processUsingApproachOneWithErrorHandling(Flux flux) { + logger.info("starting approach one w error handling!"); + flux = concatAndSubstringFooName(flux); + flux = concatAndSubstringFooName(flux); + flux = substringFooName(flux); + flux = processFooReducingQuantity(flux); + flux = processFooReducingQuantity(flux); + flux = processFooReducingQuantity(flux); + flux = reportResult(flux, "ONE w/ EH"); + flux = flux.doOnError(error -> { + logger.error("Approach 1 with Error Handling failed!", error); + }); + flux.subscribe(); + } + + public void processUsingApproachThree(Flux flux) { + logger.info("starting approach three!"); + flux = concatAndSubstringFooName(flux); + flux = reportResult(flux, "THREE"); + flux = flux.doOnError(error -> { + logger.error("Approach 3 failed!", error); + }); + flux.subscribe(); + } + + public void processUsingApproachFourWithCheckpoint(Flux flux) { + logger.info("starting approach four!"); + flux = concatAndSubstringFooName(flux); + flux = flux.checkpoint("CHECKPOINT 1"); + flux = concatAndSubstringFooName(flux); + flux = divideFooQuantity(flux); + flux = flux.checkpoint("CHECKPOINT 2", true); + flux = reportResult(flux, "FOUR"); + flux = concatAndSubstringFooName(flux).doOnError(error -> { + logger.error("Approach 4 failed!", error); + }); + flux.subscribe(); + } + + public void processUsingApproachFourWithInitialCheckpoint(Flux flux) { + logger.info("starting approach four!"); + flux = concatAndSubstringFooName(flux); + flux = flux.checkpoint("CHECKPOINT 1", true); + flux = concatAndSubstringFooName(flux); + flux = divideFooQuantity(flux); + flux = reportResult(flux, "FOUR"); + flux = flux.doOnError(error -> { + logger.error("Approach 4-2 failed!", error); + }); + flux.subscribe(); + } + + public void processUsingApproachFivePublishingToDifferentParallelThreads(Flux flux) { + logger.info("starting approach five-parallel!"); + flux = concatAndSubstringFooName(flux).publishOn(Schedulers.newParallel("five-parallel-foo")) + .log(); + flux = concatAndSubstringFooName(flux); + flux = divideFooQuantity(flux); + flux = reportResult(flux, "FIVE-PARALLEL").publishOn(Schedulers.newSingle("five-parallel-bar")); + flux = concatAndSubstringFooName(flux).doOnError(error -> { + logger.error("Approach 5-parallel failed!", error); + }); + flux.subscribeOn(Schedulers.newParallel("five-parallel-starter")) + .subscribe(); + } + + public void processUsingApproachFivePublishingToDifferentSingleThreads(Flux flux) { + logger.info("starting approach five-single!"); + flux = flux.log() + .subscribeOn(Schedulers.newSingle("five-single-starter")); + flux = concatAndSubstringFooName(flux).publishOn(Schedulers.newSingle("five-single-foo")); + flux = concatAndSubstringFooName(flux); + flux = divideFooQuantity(flux); + flux = reportResult(flux, "FIVE-SINGLE").publishOn(Schedulers.newSingle("five-single-bar")); + flux = concatAndSubstringFooName(flux).doOnError(error -> { + logger.error("Approach 5-single failed!", error); + }); + flux.subscribe(); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/ServerDebuggingApplication.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/ServerDebuggingApplication.java new file mode 100644 index 0000000000..c9838705b5 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/ServerDebuggingApplication.java @@ -0,0 +1,29 @@ +package com.baeldung.reactive.debugging.server; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.security.config.web.server.ServerHttpSecurity; +import org.springframework.security.web.server.SecurityWebFilterChain; +import org.springframework.web.reactive.config.EnableWebFlux; + +import java.util.Collections; + +@EnableWebFlux +@SpringBootApplication +public class ServerDebuggingApplication { + + public static void main(String[] args) { + SpringApplication app = new SpringApplication(ServerDebuggingApplication.class); + app.setDefaultProperties(Collections.singletonMap("server.port", "8081")); + app.run(args); + } + + @Bean + public SecurityWebFilterChain debuggingServerSpringSecurityFilterChain(ServerHttpSecurity http) { + http.authorizeExchange() + .anyExchange() + .permitAll(); + return http.build(); + } +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/handlers/ServerHandler.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/handlers/ServerHandler.java new file mode 100644 index 0000000000..15f9a4b786 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/handlers/ServerHandler.java @@ -0,0 +1,45 @@ +package com.baeldung.reactive.debugging.server.handlers; + +import com.baeldung.reactive.debugging.server.model.Foo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.reactive.function.server.ServerResponse; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.util.concurrent.ThreadLocalRandom; + +@Component +public class ServerHandler { + + private static Logger logger = LoggerFactory.getLogger(ServerHandler.class); + + public Mono useHandler(final ServerRequest request) { + // there are chances that something goes wrong here... + return ServerResponse.ok() + .contentType(MediaType.TEXT_EVENT_STREAM) + .body(Flux.interval(Duration.ofSeconds(1)) + .map(sequence -> { + logger.info("retrieving Foo. Sequence: {}", sequence); + if (ThreadLocalRandom.current() + .nextInt(0, 50) == 1) { + throw new RuntimeException("There was an error retrieving the Foo!"); + } + return new Foo(sequence, "name" + sequence); + + }), Foo.class); + } + + public Mono useHandlerFinite(final ServerRequest request) { + return ServerResponse.ok() + .contentType(MediaType.TEXT_EVENT_STREAM) + .body(Flux.range(0, 50) + .map(sequence -> { + return new Foo(new Long(sequence), "theFooNameNumber" + sequence); + }), Foo.class); + } +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/model/Foo.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/model/Foo.java new file mode 100644 index 0000000000..2c419a23f8 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/model/Foo.java @@ -0,0 +1,13 @@ +package com.baeldung.reactive.debugging.server.model; + +import lombok.AllArgsConstructor; +import lombok.Data; + +@Data +@AllArgsConstructor +public class Foo { + + private Long id; + private String name; + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/routers/ServerRouter.java b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/routers/ServerRouter.java new file mode 100644 index 0000000000..5db2ab92b6 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/debugging/server/routers/ServerRouter.java @@ -0,0 +1,21 @@ +package com.baeldung.reactive.debugging.server.routers; + +import com.baeldung.reactive.debugging.server.handlers.ServerHandler; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.web.reactive.function.server.RequestPredicates; +import org.springframework.web.reactive.function.server.RouterFunction; +import org.springframework.web.reactive.function.server.RouterFunctions; +import org.springframework.web.reactive.function.server.ServerResponse; + +@Configuration +public class ServerRouter { + + @Bean + public RouterFunction responseRoute(@Autowired ServerHandler handler) { + return RouterFunctions.route(RequestPredicates.GET("/functional-reactive/periodic-foo"), handler::useHandler) + .andRoute(RequestPredicates.GET("/functional-reactive/periodic-foo-2"), handler::useHandlerFinite); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/ErrorHandlingApplication.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/ErrorHandlingApplication.java new file mode 100644 index 0000000000..a22f04a8d2 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/ErrorHandlingApplication.java @@ -0,0 +1,24 @@ +package com.baeldung.reactive.errorhandling; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import org.springframework.security.config.web.server.ServerHttpSecurity; +import org.springframework.security.web.server.SecurityWebFilterChain; + +@SpringBootApplication +public class ErrorHandlingApplication { + + public static void main(String[] args) { + SpringApplication.run(ErrorHandlingApplication.class, args); + } + + @Bean + public SecurityWebFilterChain securityWebFilterChain(ServerHttpSecurity http) { + http.authorizeExchange() + .anyExchange() + .permitAll(); + http.csrf().disable(); + return http.build(); + } +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorAttributes.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorAttributes.java new file mode 100644 index 0000000000..0a96a8593c --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorAttributes.java @@ -0,0 +1,52 @@ +package com.baeldung.reactive.errorhandling; + +import org.springframework.boot.web.error.ErrorAttributeOptions; +import org.springframework.boot.web.reactive.error.DefaultErrorAttributes; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.server.ServerRequest; + +import java.util.Map; + +@Component +public class GlobalErrorAttributes extends DefaultErrorAttributes{ + + private HttpStatus status = HttpStatus.BAD_REQUEST; + private String message = "please provide a name"; + + @Override + public Map getErrorAttributes(ServerRequest request, ErrorAttributeOptions options) { + Map map = super.getErrorAttributes(request, options); + map.put("status", getStatus()); + map.put("message", getMessage()); + return map; + } + + /** + * @return the status + */ + public HttpStatus getStatus() { + return status; + } + + /** + * @param status the status to set + */ + public void setStatus(HttpStatus status) { + this.status = status; + } + + /** + * @return the message + */ + public String getMessage() { + return message; + } + + /** + * @param message the message to set + */ + public void setMessage(String message) { + this.message = message; + } +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorWebExceptionHandler.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorWebExceptionHandler.java new file mode 100644 index 0000000000..24583308cd --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/GlobalErrorWebExceptionHandler.java @@ -0,0 +1,48 @@ +package com.baeldung.reactive.errorhandling; + +import org.springframework.boot.autoconfigure.web.WebProperties; +import org.springframework.boot.autoconfigure.web.reactive.error.AbstractErrorWebExceptionHandler; +import org.springframework.boot.web.error.ErrorAttributeOptions; +import org.springframework.boot.web.reactive.error.ErrorAttributes; +import org.springframework.context.ApplicationContext; +import org.springframework.core.annotation.Order; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.codec.ServerCodecConfigurer; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.server.RequestPredicates; +import org.springframework.web.reactive.function.server.RouterFunction; +import org.springframework.web.reactive.function.server.RouterFunctions; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.reactive.function.server.ServerResponse; +import reactor.core.publisher.Mono; + +import java.util.Map; + +@Component +@Order(-2) +public class GlobalErrorWebExceptionHandler extends AbstractErrorWebExceptionHandler { + + public GlobalErrorWebExceptionHandler(GlobalErrorAttributes g, ApplicationContext applicationContext, + ServerCodecConfigurer serverCodecConfigurer) { + super(g, new WebProperties.Resources(), applicationContext); + super.setMessageWriters(serverCodecConfigurer.getWriters()); + super.setMessageReaders(serverCodecConfigurer.getReaders()); + } + + @Override + protected RouterFunction getRoutingFunction(final ErrorAttributes errorAttributes) { + return RouterFunctions.route(RequestPredicates.all(), this::renderErrorResponse); + } + + private Mono renderErrorResponse(final ServerRequest request) { + + final Map errorPropertiesMap = getErrorAttributes(request, ErrorAttributeOptions.defaults()); + + return ServerResponse.status(HttpStatus.BAD_REQUEST) + .contentType(MediaType.APPLICATION_JSON) + .body(BodyInserters.fromValue(errorPropertiesMap)); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/NameRequiredException.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/NameRequiredException.java new file mode 100644 index 0000000000..bdc7771b80 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/NameRequiredException.java @@ -0,0 +1,11 @@ +package com.baeldung.reactive.errorhandling; + +import org.springframework.http.HttpStatus; +import org.springframework.web.server.ResponseStatusException; + +public class NameRequiredException extends ResponseStatusException { + + public NameRequiredException(HttpStatus status, String message, Throwable e) { + super(status, message, e); + } +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler1.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler1.java new file mode 100644 index 0000000000..32f2f1c3a2 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler1.java @@ -0,0 +1,28 @@ +package com.baeldung.reactive.errorhandling.handlers; + +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.reactive.function.server.ServerResponse; +import reactor.core.publisher.Mono; + +@Component +public class Handler1 { + + public Mono handleRequest1(ServerRequest request) { + return sayHello(request).onErrorReturn("Hello, Stranger") + .flatMap(s -> ServerResponse.ok() + .contentType(MediaType.TEXT_PLAIN) + .bodyValue(s)); + } + + private Mono sayHello(ServerRequest request) { + try { + return Mono.just("Hello, " + request.queryParam("name") + .get()); + } catch (Exception e) { + return Mono.error(e); + } + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler2.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler2.java new file mode 100644 index 0000000000..093120c92b --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler2.java @@ -0,0 +1,37 @@ +package com.baeldung.reactive.errorhandling.handlers; + +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.reactive.function.server.ServerResponse; +import reactor.core.publisher.Mono; + +@Component +public class Handler2 { + +public Mono handleRequest2(ServerRequest request) { + return + sayHello(request) + .flatMap(s -> ServerResponse.ok() + .contentType(MediaType.TEXT_PLAIN) + .bodyValue(s)) + .onErrorResume(e -> sayHelloFallback() + .flatMap(s -> ServerResponse.ok() + .contentType(MediaType.TEXT_PLAIN) + .bodyValue(s))); +} + + private Mono sayHello(ServerRequest request) { + try { + return Mono.just("Hello, " + request.queryParam("name") + .get()); + } catch (Exception e) { + return Mono.error(e); + } + } + + private Mono sayHelloFallback() { + return Mono.just("Hello, Stranger"); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler3.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler3.java new file mode 100644 index 0000000000..44842e0539 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler3.java @@ -0,0 +1,33 @@ +package com.baeldung.reactive.errorhandling.handlers; + +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.reactive.function.server.ServerResponse; +import reactor.core.publisher.Mono; + +@Component +public class Handler3 { + + public Mono handleRequest3(ServerRequest request) { + return + sayHello(request) + .flatMap(s -> ServerResponse.ok() + .contentType(MediaType.TEXT_PLAIN) + .bodyValue(s)) + .onErrorResume(e -> (Mono.just("Hi, I looked around for your name but found: " + + e.getMessage())).flatMap(s -> ServerResponse.ok() + .contentType(MediaType.TEXT_PLAIN) + .bodyValue(s))); + } + + private Mono sayHello(ServerRequest request) { + try { + return Mono.just("Hello, " + request.queryParam("name") + .get()); + } catch (Exception e) { + return Mono.error(e); + } + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler4.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler4.java new file mode 100644 index 0000000000..2d391a42a7 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler4.java @@ -0,0 +1,29 @@ +package com.baeldung.reactive.errorhandling.handlers; + +import com.baeldung.reactive.errorhandling.NameRequiredException; +import org.springframework.http.HttpStatus; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.reactive.function.server.ServerResponse; +import reactor.core.publisher.Mono; + +@Component +public class Handler4 { + +public Mono handleRequest4(ServerRequest request) { + return ServerResponse.ok() + .body(sayHello(request) + .onErrorResume(e -> + Mono.error(new NameRequiredException( + HttpStatus.BAD_REQUEST, "please provide a name", e))), String.class); +} + + private Mono sayHello(ServerRequest request) { + try { + return Mono.just("Hello, " + request.queryParam("name").get()); + } catch (Exception e) { + return Mono.error(e); + } + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler5.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler5.java new file mode 100644 index 0000000000..a466982865 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/handlers/Handler5.java @@ -0,0 +1,21 @@ +package com.baeldung.reactive.errorhandling.handlers; + +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.server.ServerRequest; +import org.springframework.web.reactive.function.server.ServerResponse; +import reactor.core.publisher.Mono; + +@Component +public class Handler5 { + + public Mono handleRequest5(ServerRequest request) { + return ServerResponse.ok() + .body(sayHello(request), String.class); + + } + + private Mono sayHello(ServerRequest request) { + return Mono.just("Hello, " + request.queryParam("name").get()); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router1.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router1.java new file mode 100644 index 0000000000..caf779b456 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router1.java @@ -0,0 +1,21 @@ +package com.baeldung.reactive.errorhandling.routers; + +import com.baeldung.reactive.errorhandling.handlers.Handler1; +import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.server.RequestPredicates; +import org.springframework.web.reactive.function.server.RouterFunction; +import org.springframework.web.reactive.function.server.RouterFunctions; +import org.springframework.web.reactive.function.server.ServerResponse; + +@Component +public class Router1 { + + @Bean + public RouterFunction routeRequest1(Handler1 handler) { + return RouterFunctions.route(RequestPredicates.GET("/api/endpoint1") + .and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), handler::handleRequest1); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router2.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router2.java new file mode 100644 index 0000000000..b965257c30 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router2.java @@ -0,0 +1,21 @@ +package com.baeldung.reactive.errorhandling.routers; + +import com.baeldung.reactive.errorhandling.handlers.Handler2; +import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.server.RequestPredicates; +import org.springframework.web.reactive.function.server.RouterFunction; +import org.springframework.web.reactive.function.server.RouterFunctions; +import org.springframework.web.reactive.function.server.ServerResponse; + +@Component +public class Router2 { + + @Bean + public RouterFunction routeRequest2(Handler2 handler) { + return RouterFunctions.route(RequestPredicates.GET("/api/endpoint2") + .and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), handler::handleRequest2); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router3.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router3.java new file mode 100644 index 0000000000..b8f7f983cc --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router3.java @@ -0,0 +1,21 @@ +package com.baeldung.reactive.errorhandling.routers; + +import com.baeldung.reactive.errorhandling.handlers.Handler3; +import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.server.RequestPredicates; +import org.springframework.web.reactive.function.server.RouterFunction; +import org.springframework.web.reactive.function.server.RouterFunctions; +import org.springframework.web.reactive.function.server.ServerResponse; + +@Component +public class Router3 { + + @Bean + public RouterFunction routeRequest3(Handler3 handler) { + return RouterFunctions.route(RequestPredicates.GET("/api/endpoint3") + .and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), handler::handleRequest3); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router4.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router4.java new file mode 100644 index 0000000000..03c65fec67 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router4.java @@ -0,0 +1,21 @@ +package com.baeldung.reactive.errorhandling.routers; + +import com.baeldung.reactive.errorhandling.handlers.Handler4; +import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.server.RequestPredicates; +import org.springframework.web.reactive.function.server.RouterFunction; +import org.springframework.web.reactive.function.server.RouterFunctions; +import org.springframework.web.reactive.function.server.ServerResponse; + +@Component +public class Router4 { + + @Bean + public RouterFunction routeRequest4(Handler4 handler) { + return RouterFunctions.route(RequestPredicates.GET("/api/endpoint4") + .and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), handler::handleRequest4); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router5.java b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router5.java new file mode 100644 index 0000000000..c68e04659f --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/errorhandling/routers/Router5.java @@ -0,0 +1,21 @@ +package com.baeldung.reactive.errorhandling.routers; + +import com.baeldung.reactive.errorhandling.handlers.Handler5; +import org.springframework.context.annotation.Bean; +import org.springframework.http.MediaType; +import org.springframework.stereotype.Component; +import org.springframework.web.reactive.function.server.RequestPredicates; +import org.springframework.web.reactive.function.server.RouterFunction; +import org.springframework.web.reactive.function.server.RouterFunctions; +import org.springframework.web.reactive.function.server.ServerResponse; + +@Component +public class Router5 { + + @Bean + public RouterFunction routeRequest5(Handler5 handler) { + return RouterFunctions.route(RequestPredicates.GET("/api/endpoint5") + .and(RequestPredicates.accept(MediaType.TEXT_PLAIN)), handler::handleRequest5); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/security/GreetController.java b/spring-reactive/src/main/java/com/baeldung/reactive/security/GreetController.java new file mode 100644 index 0000000000..99b79d88ea --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/security/GreetController.java @@ -0,0 +1,37 @@ +package com.baeldung.reactive.security; + +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; + +import java.security.Principal; + +@RestController +public class GreetController { + + private GreetService greetService; + + public GreetController(GreetService greetService) { + this.greetService = greetService; + } + + @GetMapping("/") + public Mono greet(Mono principal) { + return principal + .map(Principal::getName) + .map(name -> String.format("Hello, %s", name)); + } + + @GetMapping("/admin") + public Mono greetAdmin(Mono principal) { + return principal + .map(Principal::getName) + .map(name -> String.format("Admin access: %s", name)); + } + + @GetMapping("/greetService") + public Mono greetService() { + return greetService.greet(); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/security/GreetService.java b/spring-reactive/src/main/java/com/baeldung/reactive/security/GreetService.java new file mode 100644 index 0000000000..93df64bced --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/security/GreetService.java @@ -0,0 +1,15 @@ +package com.baeldung.reactive.security; + +import org.springframework.security.access.prepost.PreAuthorize; +import org.springframework.stereotype.Service; +import reactor.core.publisher.Mono; + +@Service +public class GreetService { + + @PreAuthorize("hasRole('ADMIN')") + public Mono greet() { + return Mono.just("Hello from service!"); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/security/SecurityConfig.java b/spring-reactive/src/main/java/com/baeldung/reactive/security/SecurityConfig.java new file mode 100644 index 0000000000..bb2f2d50e1 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/security/SecurityConfig.java @@ -0,0 +1,55 @@ +package com.baeldung.reactive.security; + +import org.springframework.context.annotation.Bean; +import org.springframework.security.config.annotation.method.configuration.EnableReactiveMethodSecurity; +import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity; +import org.springframework.security.config.web.server.ServerHttpSecurity; +import org.springframework.security.core.userdetails.MapReactiveUserDetailsService; +import org.springframework.security.core.userdetails.User; +import org.springframework.security.core.userdetails.UserDetails; +import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder; +import org.springframework.security.crypto.password.PasswordEncoder; +import org.springframework.security.web.server.SecurityWebFilterChain; + +@EnableWebFluxSecurity +@EnableReactiveMethodSecurity +public class SecurityConfig { + + @Bean + public SecurityWebFilterChain securitygWebFilterChain(ServerHttpSecurity http) { + return http.authorizeExchange() + .pathMatchers("/admin") + .hasAuthority("ROLE_ADMIN") + .anyExchange() + .authenticated() + .and() + .formLogin() + .and() + .csrf() + .disable() + .build(); + } + + @Bean + public MapReactiveUserDetailsService userDetailsService() { + UserDetails user = User + .withUsername("user") + .password(passwordEncoder().encode("password")) + .roles("USER") + .build(); + + UserDetails admin = User + .withUsername("admin") + .password(passwordEncoder().encode("password")) + .roles("ADMIN") + .build(); + + return new MapReactiveUserDetailsService(user, admin); + } + + @Bean + public PasswordEncoder passwordEncoder() { + return new BCryptPasswordEncoder(); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/security/SpringSecurity5Application.java b/spring-reactive/src/main/java/com/baeldung/reactive/security/SpringSecurity5Application.java new file mode 100644 index 0000000000..bc0895a38b --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/security/SpringSecurity5Application.java @@ -0,0 +1,34 @@ +package com.baeldung.reactive.security; + +import org.springframework.context.ApplicationContext; +import org.springframework.context.annotation.AnnotationConfigApplicationContext; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.ComponentScan; +import org.springframework.http.server.reactive.HttpHandler; +import org.springframework.http.server.reactive.ReactorHttpHandlerAdapter; +import org.springframework.web.reactive.config.EnableWebFlux; +import org.springframework.web.server.adapter.WebHttpHandlerBuilder; +import reactor.netty.DisposableServer; +import reactor.netty.http.server.HttpServer; + +@ComponentScan(basePackages = {"com.baeldung.reactive.security"}) +@EnableWebFlux +public class SpringSecurity5Application { + + public static void main(String[] args) { + try (AnnotationConfigApplicationContext context = + new AnnotationConfigApplicationContext(SpringSecurity5Application.class)) { + context.getBean(DisposableServer.class).onDispose().block(); + } + } + + @Bean + public DisposableServer disposableServer(ApplicationContext context) { + HttpHandler handler = WebHttpHandlerBuilder.applicationContext(context) + .build(); + ReactorHttpHandlerAdapter adapter = new ReactorHttpHandlerAdapter(handler); + HttpServer httpServer = HttpServer.create().host("localhost").port(8083); + return httpServer.handle(adapter).bindNow(); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webclient/Foo.java b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/Foo.java new file mode 100644 index 0000000000..a58d672686 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/Foo.java @@ -0,0 +1,24 @@ +package com.baeldung.reactive.webclient; + +public class Foo { + + private String name; + + public Foo() { + super(); + } + + public Foo(String name) { + super(); + this.name = name; + } + + public String getName() { + return name; + } + + public void setName(String name) { + this.name = name; + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webclient/Tweet.java b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/Tweet.java new file mode 100644 index 0000000000..4ef1aecabb --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/Tweet.java @@ -0,0 +1,13 @@ +package com.baeldung.reactive.webclient; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@NoArgsConstructor +@AllArgsConstructor +public class Tweet { + private String text; + private String username; +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webclient/TweetsSlowServiceController.java b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/TweetsSlowServiceController.java new file mode 100644 index 0000000000..42ff603b87 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/TweetsSlowServiceController.java @@ -0,0 +1,20 @@ +package com.baeldung.reactive.webclient; + +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; + +import java.util.Arrays; +import java.util.List; + +@RestController +public class TweetsSlowServiceController { + + @GetMapping("/slow-service-tweets") + private List getAllTweets() throws Exception { + Thread.sleep(2000L); // delay + return Arrays.asList( + new Tweet("RestTemplate rules", "@user1"), + new Tweet("WebClient is better", "@user2"), + new Tweet("OK, both are useful", "@user1")); + } +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebClientApplication.java b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebClientApplication.java new file mode 100644 index 0000000000..d2a5b6a024 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebClientApplication.java @@ -0,0 +1,14 @@ +package com.baeldung.reactive.webclient; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.boot.autoconfigure.security.reactive.ReactiveSecurityAutoConfiguration; + +@SpringBootApplication(exclude = { ReactiveSecurityAutoConfiguration.class }) +public class WebClientApplication { + + public static void main(String[] args) { + SpringApplication.run(WebClientApplication.class, args); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebClientController.java b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebClientController.java new file mode 100644 index 0000000000..0a83b1a1b7 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebClientController.java @@ -0,0 +1,41 @@ +package com.baeldung.reactive.webclient; + +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestPart; +import org.springframework.web.bind.annotation.ResponseStatus; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Mono; + +import java.util.HashMap; +import java.util.Map; + +@RestController +public class WebClientController { + + @ResponseStatus(HttpStatus.OK) + @GetMapping("/resource") + public Map getResource() { + Map response = new HashMap<>(); + response.put("field", "value"); + return response; + } + + @PostMapping("/resource") + public Mono postStringResource(@RequestBody Mono bodyString) { + return bodyString.map(body -> "processed-" + body); + } + + @PostMapping("/resource-foo") + public Mono postFooResource(@RequestBody Mono bodyFoo) { + return bodyFoo.map(foo -> "processedFoo-" + foo.getName()); + } + + @PostMapping(value = "/resource-multipart", consumes = MediaType.MULTIPART_FORM_DATA_VALUE) + public String handleFormUpload(@RequestPart("key1") String value1, @RequestPart("key2") String value2) { + return "processed-" + value1 + '-' + value2; + } +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebController.java b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebController.java new file mode 100644 index 0000000000..30b3ccc959 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/webclient/WebController.java @@ -0,0 +1,60 @@ +package com.baeldung.reactive.webclient; + +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.http.HttpMethod; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.RestController; +import org.springframework.web.client.RestTemplate; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; + +import java.util.List; + +@Slf4j +@RestController +public class WebController { + + private static final int DEFAULT_PORT = 8080; + + @Setter + private int serverPort = DEFAULT_PORT; + + @GetMapping("/tweets-blocking") + public List getTweetsBlocking() { + log.info("Starting BLOCKING Controller!"); + final String uri = getSlowServiceUri(); + + RestTemplate restTemplate = new RestTemplate(); + ResponseEntity> response = restTemplate.exchange( + uri, HttpMethod.GET, null, + new ParameterizedTypeReference>(){}); + + List result = response.getBody(); + result.forEach(tweet -> log.info(tweet.toString())); + log.info("Exiting BLOCKING Controller!"); + return result; + } + + @GetMapping(value = "/tweets-non-blocking", produces = MediaType.TEXT_EVENT_STREAM_VALUE) + public Flux getTweetsNonBlocking() { + log.info("Starting NON-BLOCKING Controller!"); + Flux tweetFlux = WebClient.create() + .get() + .uri(getSlowServiceUri()) + .retrieve() + .bodyToFlux(Tweet.class); + + tweetFlux.subscribe(tweet -> log.info(tweet.toString())); + log.info("Exiting NON-BLOCKING Controller!"); + return tweetFlux; + } + + private String getSlowServiceUri() { + return "http://localhost:" + serverPort + "/slow-service-tweets"; + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webclientrequests/SpringWebClientRequestsApp.java b/spring-reactive/src/main/java/com/baeldung/reactive/webclientrequests/SpringWebClientRequestsApp.java new file mode 100644 index 0000000000..d43fe43a77 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/webclientrequests/SpringWebClientRequestsApp.java @@ -0,0 +1,12 @@ +package com.baeldung.reactive.webclientrequests; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class SpringWebClientRequestsApp { + + public static void main(String[] args) { + SpringApplication.run(SpringWebClientRequestsApp.class, args); + } +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webflux/Employee.java b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/Employee.java new file mode 100644 index 0000000000..d41a4f2791 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/Employee.java @@ -0,0 +1,17 @@ +package com.baeldung.reactive.webflux; + +import lombok.AllArgsConstructor; +import lombok.Data; +import lombok.NoArgsConstructor; + +@Data +@AllArgsConstructor +@NoArgsConstructor +public class Employee { + + private String id; + private String name; + + // standard getters and setters + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webflux/EmployeeRepository.java b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/EmployeeRepository.java new file mode 100644 index 0000000000..8a57f5c510 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/EmployeeRepository.java @@ -0,0 +1,59 @@ +package com.baeldung.reactive.webflux; + +import com.baeldung.reactive.webflux.Employee; +import org.springframework.stereotype.Repository; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.HashMap; +import java.util.Map; + +@Repository +public class EmployeeRepository { + + static Map employeeData; + + static Map employeeAccessData; + + static { + employeeData = new HashMap<>(); + employeeData.put("1", new Employee("1", "Employee 1")); + employeeData.put("2", new Employee("2", "Employee 2")); + employeeData.put("3", new Employee("3", "Employee 3")); + employeeData.put("4", new Employee("4", "Employee 4")); + employeeData.put("5", new Employee("5", "Employee 5")); + employeeData.put("6", new Employee("6", "Employee 6")); + employeeData.put("7", new Employee("7", "Employee 7")); + employeeData.put("8", new Employee("8", "Employee 8")); + employeeData.put("9", new Employee("9", "Employee 9")); + employeeData.put("10", new Employee("10", "Employee 10")); + + employeeAccessData = new HashMap<>(); + employeeAccessData.put("1", "Employee 1 Access Key"); + employeeAccessData.put("2", "Employee 2 Access Key"); + employeeAccessData.put("3", "Employee 3 Access Key"); + employeeAccessData.put("4", "Employee 4 Access Key"); + employeeAccessData.put("5", "Employee 5 Access Key"); + employeeAccessData.put("6", "Employee 6 Access Key"); + employeeAccessData.put("7", "Employee 7 Access Key"); + employeeAccessData.put("8", "Employee 8 Access Key"); + employeeAccessData.put("9", "Employee 9 Access Key"); + employeeAccessData.put("10", "Employee 10 Access Key"); + } + + public Mono findEmployeeById(String id) { + return Mono.just(employeeData.get(id)); + } + + public Flux findAllEmployees() { + return Flux.fromIterable(employeeData.values()); + } + + public Mono updateEmployee(Employee employee) { + Employee existingEmployee = employeeData.get(employee.getId()); + if (existingEmployee != null) { + existingEmployee.setName(employee.getName()); + } + return Mono.just(existingEmployee); + } +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeController.java b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeController.java new file mode 100644 index 0000000000..23aacfdd95 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeController.java @@ -0,0 +1,39 @@ +package com.baeldung.reactive.webflux.annotation; + +import com.baeldung.reactive.webflux.Employee; +import com.baeldung.reactive.webflux.EmployeeRepository; +import org.springframework.web.bind.annotation.GetMapping; +import org.springframework.web.bind.annotation.PathVariable; +import org.springframework.web.bind.annotation.PostMapping; +import org.springframework.web.bind.annotation.RequestBody; +import org.springframework.web.bind.annotation.RequestMapping; +import org.springframework.web.bind.annotation.RestController; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +@RestController +@RequestMapping("/employees") +public class EmployeeController { + + private final EmployeeRepository employeeRepository; + + public EmployeeController(EmployeeRepository employeeRepository) { + this.employeeRepository = employeeRepository; + } + + @GetMapping("/{id}") + private Mono getEmployeeById(@PathVariable String id) { + return employeeRepository.findEmployeeById(id); + } + + @GetMapping + private Flux getAllEmployees() { + return employeeRepository.findAllEmployees(); + } + + @PostMapping("/update") + private Mono updateEmployee(@RequestBody Employee employee) { + return employeeRepository.updateEmployee(employee); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeSpringApplication.java b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeSpringApplication.java new file mode 100644 index 0000000000..1a2ada96e9 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeSpringApplication.java @@ -0,0 +1,16 @@ +package com.baeldung.reactive.webflux.annotation; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class EmployeeSpringApplication { + + public static void main(String[] args) { + SpringApplication.run(EmployeeSpringApplication.class, args); + + EmployeeWebClient employeeWebClient = new EmployeeWebClient(); + employeeWebClient.consume(); + } + +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeWebClient.java b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeWebClient.java new file mode 100644 index 0000000000..611a261a1b --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeWebClient.java @@ -0,0 +1,32 @@ +package com.baeldung.reactive.webflux.annotation; + +import com.baeldung.reactive.webflux.Employee; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.web.reactive.function.client.WebClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +public class EmployeeWebClient { + + private static final Logger LOGGER = LoggerFactory.getLogger(EmployeeWebClient.class); + + WebClient client = WebClient.create("http://localhost:8080"); + + public void consume() { + + Mono employeeMono = client.get() + .uri("/employees/{id}", "1") + .retrieve() + .bodyToMono(Employee.class); + + employeeMono.subscribe(employee -> LOGGER.debug("Employee: {}", employee)); + + Flux employeeFlux = client.get() + .uri("/employees") + .retrieve() + .bodyToFlux(Employee.class); + + employeeFlux.subscribe(employee -> LOGGER.debug("Employee: {}", employee)); + } +} \ No newline at end of file diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeWebSecurityConfig.java b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeWebSecurityConfig.java new file mode 100644 index 0000000000..8dfa455ce3 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/annotation/EmployeeWebSecurityConfig.java @@ -0,0 +1,45 @@ +package com.baeldung.reactive.webflux.annotation; + +import org.springframework.context.annotation.Bean; +import org.springframework.http.HttpMethod; +import org.springframework.security.config.annotation.web.reactive.EnableWebFluxSecurity; +import org.springframework.security.config.web.server.ServerHttpSecurity; +import org.springframework.security.core.userdetails.MapReactiveUserDetailsService; +import org.springframework.security.core.userdetails.User; +import org.springframework.security.core.userdetails.UserDetails; +import org.springframework.security.crypto.bcrypt.BCryptPasswordEncoder; +import org.springframework.security.crypto.password.PasswordEncoder; +import org.springframework.security.web.server.SecurityWebFilterChain; + +@EnableWebFluxSecurity +public class EmployeeWebSecurityConfig { + + @Bean + public MapReactiveUserDetailsService userDetailsService() { + UserDetails user = User + .withUsername("admin") + .password(passwordEncoder().encode("password")) + .roles("ADMIN") + .build(); + return new MapReactiveUserDetailsService(user); + } + + @Bean + public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) { + http.csrf() + .disable() + .authorizeExchange() + .pathMatchers(HttpMethod.POST, "/employees/update") + .hasRole("ADMIN") + .pathMatchers("/**") + .permitAll() + .and() + .httpBasic(); + return http.build(); + } + + @Bean + public PasswordEncoder passwordEncoder() { + return new BCryptPasswordEncoder(); + } +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webflux/functional/EmployeeFunctionalConfig.java b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/functional/EmployeeFunctionalConfig.java new file mode 100644 index 0000000000..f97d40e4e7 --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/functional/EmployeeFunctionalConfig.java @@ -0,0 +1,74 @@ +package com.baeldung.reactive.webflux.functional; + +import com.baeldung.reactive.webflux.Employee; +import com.baeldung.reactive.webflux.EmployeeRepository; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.security.config.web.server.ServerHttpSecurity; +import org.springframework.security.web.server.SecurityWebFilterChain; +import org.springframework.web.reactive.function.server.RouterFunction; +import org.springframework.web.reactive.function.server.ServerResponse; + +import static org.springframework.web.reactive.function.BodyExtractors.toMono; +import static org.springframework.web.reactive.function.server.RequestPredicates.GET; +import static org.springframework.web.reactive.function.server.RequestPredicates.POST; +import static org.springframework.web.reactive.function.server.RouterFunctions.route; +import static org.springframework.web.reactive.function.server.ServerResponse.ok; + +@Configuration +public class EmployeeFunctionalConfig { + + @Bean + EmployeeRepository employeeRepository() { + return new EmployeeRepository(); + } + + @Bean + RouterFunction getAllEmployeesRoute() { + return route(GET("/employees"), + req -> ok().body( + employeeRepository().findAllEmployees(), Employee.class)); + } + + @Bean + RouterFunction getEmployeeByIdRoute() { + return route(GET("/employees/{id}"), + req -> ok().body( + employeeRepository().findEmployeeById(req.pathVariable("id")), Employee.class)); + } + + @Bean + RouterFunction updateEmployeeRoute() { + return route(POST("/employees/update"), + req -> req.body(toMono(Employee.class)) + .doOnNext(employeeRepository()::updateEmployee) + .then(ok().build())); + } + + @Bean + RouterFunction composedRoutes() { + return + route(GET("/employees"), + req -> ok().body( + employeeRepository().findAllEmployees(), Employee.class)) + + .and(route(GET("/employees/{id}"), + req -> ok().body( + employeeRepository().findEmployeeById(req.pathVariable("id")), Employee.class))) + + .and(route(POST("/employees/update"), + req -> req.body(toMono(Employee.class)) + .doOnNext(employeeRepository()::updateEmployee) + .then(ok().build()))); + } + + @Bean + public SecurityWebFilterChain springSecurityFilterChain(ServerHttpSecurity http) { + http.csrf() + .disable() + .authorizeExchange() + .anyExchange() + .permitAll(); + return http.build(); + } +} diff --git a/spring-reactive/src/main/java/com/baeldung/reactive/webflux/functional/EmployeeSpringFunctionalApplication.java b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/functional/EmployeeSpringFunctionalApplication.java new file mode 100644 index 0000000000..f710cf44eb --- /dev/null +++ b/spring-reactive/src/main/java/com/baeldung/reactive/webflux/functional/EmployeeSpringFunctionalApplication.java @@ -0,0 +1,13 @@ +package com.baeldung.reactive.webflux.functional; + +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; + +@SpringBootApplication +public class EmployeeSpringFunctionalApplication { + + public static void main(String[] args) { + SpringApplication.run(EmployeeSpringFunctionalApplication.class, args); + } + +} diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceIntegrationTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceIntegrationTest.java new file mode 100644 index 0000000000..5cb1a69fbd --- /dev/null +++ b/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceIntegrationTest.java @@ -0,0 +1,62 @@ +package com.baeldung.reactive.debugging.consumer; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.classic.spi.IThrowableProxy; +import com.baeldung.reactive.debugging.consumer.model.Foo; +import com.baeldung.reactive.debugging.consumer.service.FooService; +import com.baeldung.reactive.debugging.consumer.utils.ListAppender; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Hooks; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ConsumerFooServiceIntegrationTest { + + FooService service = new FooService(); + + @BeforeEach + public void clearLogList() { + Hooks.onOperatorDebug(); + ListAppender.clearEventList(); + } + + @Test + public void givenFooWithNullId_whenProcessFoo_thenLogsWithDebugTrace() { + Foo one = new Foo(1, "nameverylong", 8); + Foo two = new Foo(null, "nameverylong", 4); + Flux flux = Flux.just(one, two); + + service.processFoo(flux); + + Collection allLoggedEntries = ListAppender.getEvents() + .stream() + .map(ILoggingEvent::getFormattedMessage) + .collect(Collectors.toList()); + + Collection allSuppressedEntries = ListAppender.getEvents() + .stream() + .map(ILoggingEvent::getThrowableProxy) + .flatMap(t -> { + return Optional.ofNullable(t) + .map(IThrowableProxy::getSuppressed) + .map(Arrays::stream) + .orElse(Stream.empty()); + }) + .map(IThrowableProxy::getClassName) + .collect(Collectors.toList()); + assertThat(allLoggedEntries).anyMatch(entry -> entry.contains("The following error happened on processFoo method!")) + .anyMatch(entry -> entry.contains("| onSubscribe")) + .anyMatch(entry -> entry.contains("| cancel()")); + + assertThat(allSuppressedEntries) + .anyMatch(entry -> entry.contains("reactor.core.publisher.FluxOnAssembly$OnAssemblyException")); + } +} diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceLiveTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceLiveTest.java new file mode 100644 index 0000000000..c3ef67f534 --- /dev/null +++ b/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/ConsumerFooServiceLiveTest.java @@ -0,0 +1,53 @@ +package com.baeldung.reactive.debugging.consumer; + +import com.baeldung.reactive.debugging.consumer.service.FooService; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.Test; +import org.springframework.test.web.reactive.server.WebTestClient; +import org.springframework.test.web.reactive.server.WebTestClient.ResponseSpec; + +/** + * In order to run this live test, start the following classes: + * - com.baeldung.debugging.server.ServerDebuggingApplication + * - com.baeldung.debugging.consumer.ConsumerDebuggingApplication + */ +public class ConsumerFooServiceLiveTest { + + FooService service = new FooService(); + + private static final String BASE_URL = "http://localhost:8082"; + private static final String DEBUG_HOOK_ON = BASE_URL + "/debug-hook-on"; + private static final String DEBUG_HOOK_OFF = BASE_URL + "/debug-hook-off"; + + private static WebTestClient client; + + @BeforeAll + public static void setup() { + client = WebTestClient.bindToServer() + .baseUrl(BASE_URL) + .build(); + } + + @Test + public void whenRequestingDebugHookOn_thenObtainExpectedMessage() { + ResponseSpec response = client.get() + .uri(DEBUG_HOOK_ON) + .exchange(); + response.expectStatus() + .isOk() + .expectBody(String.class) + .isEqualTo("DEBUG HOOK ON"); + } + + @Test + public void whenRequestingDebugHookOff_thenObtainExpectedMessage() { + ResponseSpec response = client.get() + .uri(DEBUG_HOOK_OFF) + .exchange(); + response.expectStatus() + .isOk() + .expectBody(String.class) + .isEqualTo("DEBUG HOOK OFF"); + } + +} diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/utils/ListAppender.java b/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/utils/ListAppender.java new file mode 100644 index 0000000000..fe8b04e824 --- /dev/null +++ b/spring-reactive/src/test/java/com/baeldung/reactive/debugging/consumer/utils/ListAppender.java @@ -0,0 +1,25 @@ +package com.baeldung.reactive.debugging.consumer.utils; + +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.AppenderBase; + +import java.util.ArrayList; +import java.util.List; + +public class ListAppender extends AppenderBase { + + static private List events = new ArrayList<>(); + + @Override + protected void append(ILoggingEvent eventObject) { + events.add(eventObject); + } + + public static List getEvents() { + return events; + } + + public static void clearEventList() { + events.clear(); + } +} diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/introduction/ReactorIntegrationTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/introduction/ReactorIntegrationTest.java new file mode 100644 index 0000000000..e60d2e9b94 --- /dev/null +++ b/spring-reactive/src/test/java/com/baeldung/reactive/introduction/ReactorIntegrationTest.java @@ -0,0 +1,123 @@ +package com.baeldung.reactive.introduction; + +import org.junit.Test; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.ConnectableFlux; +import reactor.core.publisher.Flux; +import reactor.core.scheduler.Schedulers; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +public class ReactorIntegrationTest { + + private static final Logger LOGGER = LoggerFactory.getLogger(ReactorIntegrationTest.class); + + @Test + public void givenFlux_whenSubscribing_thenStream() { + + List elements = new ArrayList<>(); + + Flux.just(1, 2, 3, 4) + .log() + .map(i -> { + LOGGER.debug("{}:{}", i, Thread.currentThread()); + return i * 2; + }) + .subscribe(elements::add); + + assertThat(elements).containsExactly(2, 4, 6, 8); + } + + @Test + public void givenFlux_whenZipping_thenCombine() { + List elements = new ArrayList<>(); + + Flux.just(1, 2, 3, 4) + .log() + .map(i -> i * 2) + .zipWith(Flux.range(0, Integer.MAX_VALUE).log(), (one, two) -> String.format("First Flux: %d, Second Flux: %d", one, two)) + .subscribe(elements::add); + + assertThat(elements).containsExactly( + "First Flux: 2, Second Flux: 0", + "First Flux: 4, Second Flux: 1", + "First Flux: 6, Second Flux: 2", + "First Flux: 8, Second Flux: 3"); + } + + @Test + public void givenFlux_whenApplyingBackPressure_thenPushElementsInBatches() { + + List elements = new ArrayList<>(); + + Flux.just(1, 2, 3, 4) + .log() + .subscribe(new Subscriber() { + private Subscription s; + int onNextAmount; + + @Override + public void onSubscribe(final Subscription s) { + this.s = s; + s.request(2); + } + + @Override + public void onNext(final Integer integer) { + elements.add(integer); + onNextAmount++; + if (onNextAmount % 2 == 0) { + s.request(2); + } + } + + @Override + public void onError(final Throwable t) { + } + + @Override + public void onComplete() { + } + }); + + assertThat(elements).containsExactly(1, 2, 3, 4); + } + + @Test + public void givenFlux_whenInParallel_thenSubscribeInDifferentThreads() throws InterruptedException { + List threadNames = new ArrayList<>(); + + Flux.just(1, 2, 3, 4) + .log() + .map(i -> Thread.currentThread().getName()) + .subscribeOn(Schedulers.parallel()) + .subscribe(threadNames::add); + + Thread.sleep(1000); + + assertThat(threadNames).containsExactly("parallel-1", "parallel-1", "parallel-1", "parallel-1"); + } + + @Test + public void givenConnectableFlux_whenConnected_thenShouldStream() { + + List elements = new ArrayList<>(); + + final ConnectableFlux publish = Flux.just(1, 2, 3, 4).publish(); + + publish.subscribe(elements::add); + + assertThat(elements).isEmpty(); + + publish.connect(); + + assertThat(elements).containsExactly(1, 2, 3, 4); + } + +} diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/security/SecurityIntegrationTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/security/SecurityIntegrationTest.java new file mode 100644 index 0000000000..06644fbf77 --- /dev/null +++ b/spring-reactive/src/test/java/com/baeldung/reactive/security/SecurityIntegrationTest.java @@ -0,0 +1,37 @@ +package com.baeldung.reactive.security; + +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.context.ApplicationContext; +import org.springframework.security.test.context.support.WithMockUser; +import org.springframework.test.context.ContextConfiguration; +import org.springframework.test.context.junit.jupiter.SpringExtension; +import org.springframework.test.web.reactive.server.WebTestClient; + +@ExtendWith(SpringExtension.class) +@ContextConfiguration(classes = SpringSecurity5Application.class) +public class SecurityIntegrationTest { + + @Autowired + ApplicationContext context; + + private WebTestClient rest; + + @BeforeEach + public void setup() { + this.rest = WebTestClient.bindToApplicationContext(this.context).configureClient().build(); + } + + @Test + public void whenNoCredentials_thenRedirectToLogin() { + this.rest.get().uri("/").exchange().expectStatus().is3xxRedirection(); + } + + @Test + @WithMockUser + public void whenHasCredentials_thenSeesGreeting() { + this.rest.get().uri("/").exchange().expectStatus().isOk().expectBody(String.class).isEqualTo("Hello, user"); + } +} diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/webclient/SpringContextTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/webclient/SpringContextTest.java new file mode 100644 index 0000000000..4a1fc4390a --- /dev/null +++ b/spring-reactive/src/test/java/com/baeldung/reactive/webclient/SpringContextTest.java @@ -0,0 +1,12 @@ +package com.baeldung.reactive.webclient; + +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; + +@SpringBootTest(classes = WebClientApplication.class) +public class SpringContextTest { + + @Test + public void whenSpringContextIsBootstrapped_thenNoExceptions() { + } +} diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/webclient/WebClientIntegrationTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/webclient/WebClientIntegrationTest.java new file mode 100644 index 0000000000..28b4c19a10 --- /dev/null +++ b/spring-reactive/src/test/java/com/baeldung/reactive/webclient/WebClientIntegrationTest.java @@ -0,0 +1,325 @@ +package com.baeldung.reactive.webclient; + +import io.netty.channel.ChannelOption; +import io.netty.handler.timeout.ReadTimeoutException; +import io.netty.handler.timeout.ReadTimeoutHandler; +import io.netty.handler.timeout.WriteTimeoutHandler; +import org.junit.jupiter.api.Test; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.context.SpringBootTest.WebEnvironment; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.core.ParameterizedTypeReference; +import org.springframework.core.codec.CodecException; +import org.springframework.http.HttpHeaders; +import org.springframework.http.HttpMethod; +import org.springframework.http.HttpStatus; +import org.springframework.http.MediaType; +import org.springframework.http.ReactiveHttpOutputMessage; +import org.springframework.http.client.reactive.ClientHttpRequest; +import org.springframework.http.client.reactive.ReactorClientHttpConnector; +import org.springframework.util.LinkedMultiValueMap; +import org.springframework.util.MultiValueMap; +import org.springframework.web.reactive.function.BodyInserter; +import org.springframework.web.reactive.function.BodyInserters; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.reactive.function.client.WebClient.RequestBodySpec; +import org.springframework.web.reactive.function.client.WebClient.RequestBodyUriSpec; +import org.springframework.web.reactive.function.client.WebClient.RequestHeadersSpec; +import org.springframework.web.reactive.function.client.WebClient.RequestHeadersUriSpec; +import org.springframework.web.reactive.function.client.WebClient.ResponseSpec; +import org.springframework.web.reactive.function.client.WebClientRequestException; +import reactor.core.publisher.Mono; +import reactor.netty.http.client.HttpClient; +import reactor.test.StepVerifier; + +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import java.time.ZonedDateTime; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.TimeUnit; + +import static org.assertj.core.api.Assertions.assertThat; + +@SpringBootTest(classes = WebClientApplication.class, webEnvironment = WebEnvironment.RANDOM_PORT) +public class WebClientIntegrationTest { + + @LocalServerPort + private int port; + + private static final String BODY_VALUE = "bodyValue"; + private static final ParameterizedTypeReference> MAP_RESPONSE_REF = new ParameterizedTypeReference>() { + }; + + @Test + public void givenDifferentWebClientCreationMethods_whenUsed_thenObtainExpectedResponse() { + // WebClient creation + WebClient client1 = WebClient.create(); + WebClient client2 = WebClient.create("http://localhost:" + port); + WebClient client3 = WebClient.builder() + .baseUrl("http://localhost:" + port) + .defaultCookie("cookieKey", "cookieValue") + .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) + .defaultUriVariables(Collections.singletonMap("url", "http://localhost:8080")) + .build(); + + // response assertions + StepVerifier.create(retrieveResponse(client1.post() + .uri("http://localhost:" + port + "/resource"))) + .expectNext("processed-bodyValue") + .verifyComplete(); + StepVerifier.create(retrieveResponse(client2)) + .expectNext("processed-bodyValue") + .verifyComplete(); + StepVerifier.create(retrieveResponse(client3)) + .expectNext("processed-bodyValue") + .verifyComplete(); + // assert response without specifying URI + StepVerifier.create(retrieveResponse(client1)) + .expectErrorMatches(ex -> WebClientRequestException.class.isAssignableFrom(ex.getClass()) && ex.getMessage() + .contains("Connection refused")) + .verify(); + } + + @Test + public void givenDifferentMethodSpecifications_whenUsed_thenObtainExpectedResponse() { + // request specification + RequestBodyUriSpec uriSpecPost1 = createDefaultClient().method(HttpMethod.POST); + RequestBodyUriSpec uriSpecPost2 = createDefaultClient().post(); + RequestHeadersUriSpec requestGet = createDefaultClient().get(); + + // response assertions + StepVerifier.create(retrieveResponse(uriSpecPost1)) + .expectNext("processed-bodyValue") + .verifyComplete(); + StepVerifier.create(retrieveResponse(uriSpecPost2)) + .expectNext("processed-bodyValue") + .verifyComplete(); + StepVerifier.create(retrieveGetResponse(requestGet)) + .expectNextMatches(nextMap -> nextMap.get("field") + .equals("value")) + .verifyComplete(); + } + + @Test + public void givenDifferentUriSpecifications_whenUsed_thenObtainExpectedResponse() { + // uri specification + RequestBodySpec bodySpecUsingString = createDefaultPostRequest().uri("/resource"); + RequestBodySpec bodySpecUsingUriBuilder = createDefaultPostRequest().uri(uriBuilder -> uriBuilder.pathSegment("resource") + .build()); + RequestBodySpec bodySpecusingURI = createDefaultPostRequest().uri(URI.create("http://localhost:" + port + "/resource")); + RequestBodySpec bodySpecOverridenBaseUri = createDefaultPostRequest().uri(URI.create("/resource")); + RequestBodySpec bodySpecOverridenBaseUri2 = WebClient.builder() + .baseUrl("http://localhost:" + port) + .build() + .post() + .uri(URI.create("/resource")); + + // response assertions + StepVerifier.create(retrieveResponse(bodySpecUsingString)) + .expectNext("processed-bodyValue") + .verifyComplete(); + StepVerifier.create(retrieveResponse(bodySpecUsingUriBuilder)) + .expectNext("processed-bodyValue") + .verifyComplete(); + StepVerifier.create(retrieveResponse(bodySpecusingURI)) + .expectNext("processed-bodyValue") + .verifyComplete(); + // assert sending request overriding base URI + StepVerifier.create(retrieveResponse(bodySpecOverridenBaseUri)) + .expectErrorMatches(ex -> WebClientRequestException.class.isAssignableFrom(ex.getClass()) && ex.getMessage() + .contains("Connection refused")) + .verify(); + StepVerifier.create(retrieveResponse(bodySpecOverridenBaseUri2)) + .expectErrorMatches(ex -> WebClientRequestException.class.isAssignableFrom(ex.getClass()) && ex.getMessage() + .contains("Connection refused")) + .verify(); + } + + @Test + public void givenDifferentBodySpecifications_whenUsed_thenObtainExpectedResponse() { + // request body specifications + RequestHeadersSpec headersSpecPost1 = createDefaultPostResourceRequest().body(BodyInserters.fromPublisher(Mono.just(BODY_VALUE), String.class)); + RequestHeadersSpec headersSpecPost2 = createDefaultPostResourceRequest().body(BodyInserters.fromValue(BODY_VALUE)); + RequestHeadersSpec headersSpecPost3 = createDefaultPostResourceRequest().bodyValue(BODY_VALUE); + RequestHeadersSpec headersSpecFooPost = createDefaultPostRequest().uri("/resource-foo") + .body(Mono.just(new Foo("fooName")), Foo.class); + BodyInserter inserterPlainObject = BodyInserters.fromValue(new Object()); + RequestHeadersSpec headersSpecPlainObject = createDefaultPostResourceRequest().body(inserterPlainObject); + + // request body specifications - using other inserter method (multipart request) + LinkedMultiValueMap map = new LinkedMultiValueMap<>(); + map.add("key1", "multipartValue1"); + map.add("key2", "multipartValue2"); + BodyInserter, ClientHttpRequest> inserterMultipart = BodyInserters.fromMultipartData(map); + RequestHeadersSpec headersSpecInserterMultipart = createDefaultPostRequest().uri("/resource-multipart") + .body(inserterMultipart); + + // response assertions + StepVerifier.create(retrieveResponse(headersSpecPost1)) + .expectNext("processed-bodyValue") + .verifyComplete(); + StepVerifier.create(retrieveResponse(headersSpecPost2)) + .expectNext("processed-bodyValue") + .verifyComplete(); + StepVerifier.create(retrieveResponse(headersSpecPost3)) + .expectNext("processed-bodyValue") + .verifyComplete(); + StepVerifier.create(retrieveResponse(headersSpecFooPost)) + .expectNext("processedFoo-fooName") + .verifyComplete(); + StepVerifier.create(retrieveResponse(headersSpecInserterMultipart)) + .expectNext("processed-multipartValue1-multipartValue2") + .verifyComplete(); + // assert error plain `new Object()` as request body + StepVerifier.create(retrieveResponse(headersSpecPlainObject)) + .expectError(CodecException.class) + .verify(); + // assert response for request with no body + Mono> responsePostWithNoBody = createDefaultPostResourceRequest().exchangeToMono(responseHandler -> { + assertThat(responseHandler.statusCode()).isEqualTo(HttpStatus.BAD_REQUEST); + return responseHandler.bodyToMono(MAP_RESPONSE_REF); + }); + StepVerifier.create(responsePostWithNoBody) + .expectNextMatches(nextMap -> nextMap.get("error") + .equals("Bad Request")) + .verifyComplete(); + } + + @Test + public void givenPostSpecifications_whenHeadersAdded_thenObtainExpectedResponse() { + // request header specification + RequestHeadersSpec headersSpecInserterStringWithHeaders = createDefaultPostResourceRequestResponse().header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE) + .accept(MediaType.APPLICATION_JSON, MediaType.APPLICATION_XML) + .acceptCharset(StandardCharsets.UTF_8) + .ifNoneMatch("*") + .ifModifiedSince(ZonedDateTime.now()); + + // response assertions + StepVerifier.create(retrieveResponse(headersSpecInserterStringWithHeaders)) + .expectNext("processed-bodyValue") + .verifyComplete(); + } + + @Test + public void givenDifferentResponseSpecifications_whenUsed_thenObtainExpectedResponse() { + ResponseSpec responseSpecPostString = createDefaultPostResourceRequestResponse().retrieve(); + Mono responsePostString = responseSpecPostString.bodyToMono(String.class); + Mono responsePostString2 = createDefaultPostResourceRequestResponse().exchangeToMono(response -> { + if (response.statusCode() == HttpStatus.OK) { + return response.bodyToMono(String.class); + } else if (response.statusCode() + .is4xxClientError()) { + return Mono.just("Error response"); + } else { + return response.createException() + .flatMap(Mono::error); + } + }); + Mono responsePostNoBody = createDefaultPostResourceRequest().exchangeToMono(response -> { + if (response.statusCode() == HttpStatus.OK) { + return response.bodyToMono(String.class); + } else if (response.statusCode() + .is4xxClientError()) { + return Mono.just("Error response"); + } else { + return response.createException() + .flatMap(Mono::error); + } + }); + Mono> responseGet = createDefaultClient().get() + .uri("/resource") + .retrieve() + .bodyToMono(MAP_RESPONSE_REF); + + // response assertions + StepVerifier.create(responsePostString) + .expectNext("processed-bodyValue") + .verifyComplete(); + StepVerifier.create(responsePostString2) + .expectNext("processed-bodyValue") + .verifyComplete(); + StepVerifier.create(responsePostNoBody) + .expectNext("Error response") + .verifyComplete(); + StepVerifier.create(responseGet) + .expectNextMatches(nextMap -> nextMap.get("field") + .equals("value")) + .verifyComplete(); + } + + @Test + public void givenWebClientWithTimeoutConfigurations_whenRequestUsingWronglyConfiguredPublisher_thenObtainTimeout() { + HttpClient httpClient = HttpClient.create() + .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 1000) + .responseTimeout(Duration.ofMillis(1000)) + .doOnConnected(conn -> conn.addHandlerLast(new ReadTimeoutHandler(1000, TimeUnit.MILLISECONDS)) + .addHandlerLast(new WriteTimeoutHandler(1000, TimeUnit.MILLISECONDS))); + + WebClient timeoutClient = WebClient.builder() + .baseUrl("http://localhost:" + port) + .clientConnector(new ReactorClientHttpConnector(httpClient)) + .build(); + + RequestHeadersSpec neverendingMonoBodyRequest = timeoutClient.post() + .uri("/resource") + .body(Mono.never(), String.class); + + StepVerifier.create(neverendingMonoBodyRequest.retrieve() + .bodyToMono(String.class)) + .expectErrorMatches(ex -> WebClientRequestException.class.isAssignableFrom(ex.getClass()) && ReadTimeoutException.class.isAssignableFrom(ex.getCause() + .getClass())) + .verify(); + } + + // helper methods to create default instances + private WebClient createDefaultClient() { + return WebClient.create("http://localhost:" + port); + } + + private RequestBodyUriSpec createDefaultPostRequest() { + return createDefaultClient().post(); + } + + private RequestBodySpec createDefaultPostResourceRequest() { + return createDefaultPostRequest().uri("/resource"); + } + + private RequestHeadersSpec createDefaultPostResourceRequestResponse() { + return createDefaultPostResourceRequest().bodyValue(BODY_VALUE); + } + + // helper methods to retrieve a response based on different steps of the process (specs) + private Mono retrieveResponse(WebClient client) { + return client.post() + .uri("/resource") + .bodyValue(BODY_VALUE) + .retrieve() + .bodyToMono(String.class); + } + + private Mono retrieveResponse(RequestBodyUriSpec spec) { + return spec.uri("/resource") + .bodyValue(BODY_VALUE) + .retrieve() + .bodyToMono(String.class); + } + + private Mono> retrieveGetResponse(RequestHeadersUriSpec spec) { + return spec.uri("/resource") + .retrieve() + .bodyToMono(MAP_RESPONSE_REF); + } + + private Mono retrieveResponse(RequestBodySpec spec) { + return spec.bodyValue(BODY_VALUE) + .retrieve() + .bodyToMono(String.class); + } + + private Mono retrieveResponse(RequestHeadersSpec spec) { + return spec.retrieve() + .bodyToMono(String.class); + } +} diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/webclient/WebControllerIntegrationTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/webclient/WebControllerIntegrationTest.java new file mode 100644 index 0000000000..3c0683a419 --- /dev/null +++ b/spring-reactive/src/test/java/com/baeldung/reactive/webclient/WebControllerIntegrationTest.java @@ -0,0 +1,51 @@ +package com.baeldung.reactive.webclient; + +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.web.reactive.server.WebTestClient; + +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = WebClientApplication.class) +public class WebControllerIntegrationTest { + + @LocalServerPort + int randomServerPort; + + @Autowired + private WebTestClient testClient; + + @Autowired + private WebController webController; + + @Before + public void setup() { + webController.setServerPort(randomServerPort); + } + + @Test + public void whenEndpointWithBlockingClientIsCalled_thenThreeTweetsAreReceived() { + testClient.get() + .uri("/tweets-blocking") + .exchange() + .expectStatus() + .isOk() + .expectBodyList(Tweet.class) + .hasSize(3); + } + + @Test + public void whenEndpointWithNonBlockingClientIsCalled_thenThreeTweetsAreReceived() { + testClient.get() + .uri("/tweets-non-blocking") + .exchange() + .expectStatus() + .isOk() + .expectBodyList(Tweet.class) + .hasSize(3); + } +} \ No newline at end of file diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/webclient/WebTestClientIntegrationTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/webclient/WebTestClientIntegrationTest.java new file mode 100644 index 0000000000..90f4a0eca2 --- /dev/null +++ b/spring-reactive/src/test/java/com/baeldung/reactive/webclient/WebTestClientIntegrationTest.java @@ -0,0 +1,98 @@ +package com.baeldung.reactive.webclient; + +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.web.server.LocalServerPort; +import org.springframework.context.ApplicationContext; +import org.springframework.security.test.context.support.WithMockUser; +import org.springframework.test.web.reactive.server.WebTestClient; +import org.springframework.web.reactive.function.server.RequestPredicates; +import org.springframework.web.reactive.function.server.RouterFunction; +import org.springframework.web.reactive.function.server.RouterFunctions; +import org.springframework.web.reactive.function.server.ServerResponse; +import org.springframework.web.server.WebHandler; +import reactor.core.publisher.Mono; + +@SpringBootTest(classes = WebClientApplication.class, webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +public class WebTestClientIntegrationTest { + + @LocalServerPort + private int port; + + @Autowired + private ApplicationContext context; + + @Autowired + private WebClientController controller; + + private final RouterFunction ROUTER_FUNCTION = RouterFunctions.route(RequestPredicates.GET("/resource"), request -> ServerResponse.ok() + .build()); + private final WebHandler WEB_HANDLER = exchange -> Mono.empty(); + + @Test + public void testWebTestClientWithServerWebHandler() { + WebTestClient.bindToWebHandler(WEB_HANDLER) + .build(); + } + + @Test + public void testWebTestClientWithRouterFunction() { + WebTestClient.bindToRouterFunction(ROUTER_FUNCTION) + .build() + .get() + .uri("/resource") + .exchange() + .expectStatus() + .isOk() + .expectBody() + .isEmpty(); + } + + @Test + @WithMockUser + public void testWebTestClientWithServerURL() { + WebTestClient.bindToServer() + .baseUrl("http://localhost:" + port) + .build() + .get() + .uri("/resource") + .exchange() + .expectStatus() + .isOk() + .expectBody() + .jsonPath("field") + .isEqualTo("value"); + ; + } + + @Test + @WithMockUser + public void testWebTestClientWithApplicationContext() { + WebTestClient.bindToApplicationContext(context) + .build() + .get() + .uri("/resource") + .exchange() + .expectStatus() + .isOk() + .expectBody() + .jsonPath("field") + .isEqualTo("value"); + } + + @Test + public void testWebTestClientWithController() { + WebTestClient.bindToController(controller) + .build() + .get() + .uri("/resource") + .exchange() + .expectStatus() + .isOk() + .expectBody() + .jsonPath("field") + .isEqualTo("value"); + } + +} diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/webclientrequests/WebClientRequestsUnitTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/webclientrequests/WebClientRequestsUnitTest.java new file mode 100644 index 0000000000..ff59f12391 --- /dev/null +++ b/spring-reactive/src/test/java/com/baeldung/reactive/webclientrequests/WebClientRequestsUnitTest.java @@ -0,0 +1,176 @@ +package com.baeldung.reactive.webclientrequests; + +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Captor; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.springframework.boot.test.autoconfigure.web.reactive.WebFluxTest; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.web.reactive.function.client.ClientRequest; +import org.springframework.web.reactive.function.client.ClientResponse; +import org.springframework.web.reactive.function.client.ExchangeFunction; +import org.springframework.web.reactive.function.client.WebClient; +import org.springframework.web.util.DefaultUriBuilderFactory; +import reactor.core.publisher.Mono; + +import java.time.Duration; + +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +@RunWith(SpringRunner.class) +@WebFluxTest +public class WebClientRequestsUnitTest { + + private static final String BASE_URL = "https://example.com"; + + private WebClient webClient; + + @Captor + private ArgumentCaptor argumentCaptor; + + private ExchangeFunction exchangeFunction; + + @Before + public void init() { + MockitoAnnotations.initMocks(this); + this.exchangeFunction = mock(ExchangeFunction.class); + ClientResponse mockResponse = mock(ClientResponse.class); + when(this.exchangeFunction.exchange(this.argumentCaptor.capture())).thenReturn(Mono.just(mockResponse)); + this.webClient = WebClient + .builder() + .baseUrl(BASE_URL) + .exchangeFunction(exchangeFunction) + .build(); + } + + @Test + public void whenCallSimpleURI_thenURIMatched() { + this.webClient.get() + .uri("/products") + .exchange() + .block(Duration.ofSeconds(1)); + verifyCalledUrl("/products"); + } + + @Test + public void whenCallSinglePathSegmentUri_thenURIMatched() { + this.webClient.get() + .uri(uriBuilder -> uriBuilder + .path("/products/{id}") + .build(2)) + .exchange() + .block(Duration.ofSeconds(1)); + verifyCalledUrl("/products/2"); + } + + @Test + public void whenCallMultiplePathSegmentsUri_thenURIMatched() { + this.webClient.get() + .uri(uriBuilder -> uriBuilder + .path("/products/{id}/attributes/{attributeId}") + .build(2, 13)) + .exchange() + .block(Duration.ofSeconds(1)); + verifyCalledUrl("/products/2/attributes/13"); + } + + @Test + public void whenCallSingleQueryParams_thenURIMatched() { + this.webClient.get() + .uri(uriBuilder -> uriBuilder + .path("/products/") + .queryParam("name", "AndroidPhone") + .queryParam("color", "black") + .queryParam("deliveryDate", "13/04/2019") + .build()) + .exchange() + .block(Duration.ofSeconds(1)); + verifyCalledUrl("/products/?name=AndroidPhone&color=black&deliveryDate=13/04/2019"); + } + + @Test + public void whenCallSingleQueryParamsPlaceholders_thenURIMatched() { + this.webClient.get() + .uri(uriBuilder -> uriBuilder + .path("/products/") + .queryParam("name", "{title}") + .queryParam("color", "{authorId}") + .queryParam("deliveryDate", "{date}") + .build("AndroidPhone", "black", "13/04/2019")) + .exchange() + .block(Duration.ofSeconds(1)); + verifyCalledUrl("/products/?name=AndroidPhone&color=black&deliveryDate=13%2F04%2F2019"); + } + + @Test + public void whenCallArrayQueryParamsBrackets_thenURIMatched() { + this.webClient.get() + .uri(uriBuilder -> uriBuilder + .path("/products/") + .queryParam("tag[]", "Snapdragon", "NFC") + .build()) + .exchange() + .block(Duration.ofSeconds(1)); + verifyCalledUrl("/products/?tag%5B%5D=Snapdragon&tag%5B%5D=NFC"); + } + + + @Test + public void whenCallArrayQueryParams_thenURIMatched() { + this.webClient.get() + .uri(uriBuilder -> uriBuilder + .path("/products/") + .queryParam("category", "Phones", "Tablets") + .build()) + .exchange() + .block(Duration.ofSeconds(1)); + verifyCalledUrl("/products/?category=Phones&category=Tablets"); + } + + @Test + public void whenCallArrayQueryParamsComma_thenURIMatched() { + this.webClient.get() + .uri(uriBuilder -> uriBuilder + .path("/products/") + .queryParam("category", String.join(",", "Phones", "Tablets")) + .build()) + .exchange() + .block(Duration.ofSeconds(1)); + verifyCalledUrl("/products/?category=Phones,Tablets"); + } + + @Test + public void whenUriComponentEncoding_thenQueryParamsNotEscaped() { + DefaultUriBuilderFactory factory = new DefaultUriBuilderFactory(BASE_URL); + factory.setEncodingMode(DefaultUriBuilderFactory.EncodingMode.URI_COMPONENT); + this.webClient = WebClient + .builder() + .uriBuilderFactory(factory) + .baseUrl(BASE_URL) + .exchangeFunction(exchangeFunction) + .build(); + this.webClient.get() + .uri(uriBuilder -> uriBuilder + .path("/products/") + .queryParam("name", "AndroidPhone") + .queryParam("color", "black") + .queryParam("deliveryDate", "13/04/2019") + .build()) + .exchange() + .block(Duration.ofSeconds(1)); + verifyCalledUrl("/products/?name=AndroidPhone&color=black&deliveryDate=13/04/2019"); + } + + private void verifyCalledUrl(String relativeUrl) { + ClientRequest request = this.argumentCaptor.getValue(); + Assert.assertEquals(String.format("%s%s", BASE_URL, relativeUrl), request.url().toString()); + Mockito.verify(this.exchangeFunction).exchange(request); + verifyNoMoreInteractions(this.exchangeFunction); + } +} diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/webflux/annotation/EmployeeControllerIntegrationTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/webflux/annotation/EmployeeControllerIntegrationTest.java new file mode 100644 index 0000000000..24c4303e83 --- /dev/null +++ b/spring-reactive/src/test/java/com/baeldung/reactive/webflux/annotation/EmployeeControllerIntegrationTest.java @@ -0,0 +1,74 @@ +package com.baeldung.reactive.webflux.annotation; + +import com.baeldung.reactive.webflux.Employee; +import com.baeldung.reactive.webflux.EmployeeRepository; +import com.baeldung.reactive.webflux.annotation.EmployeeSpringApplication; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.MethodSorters; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.web.reactive.server.WebTestClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.ArrayList; +import java.util.List; + +import static org.mockito.BDDMockito.given; + +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes= EmployeeSpringApplication.class) +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class EmployeeControllerIntegrationTest { + + @Autowired + private WebTestClient testClient; + + @MockBean + private EmployeeRepository employeeRepository; + + @Test + public void givenEmployeeId_whenGetEmployeeById_thenCorrectEmployee() { + + Employee employee = new Employee("1", "Employee 1 Name"); + + given(employeeRepository.findEmployeeById("1")).willReturn(Mono.just(employee)); + testClient.get() + .uri("/employees/1") + .exchange() + .expectStatus() + .isOk() + .expectBody(Employee.class) + .isEqualTo(employee); + } + + @Test + public void whenGetAllEmployees_thenCorrectEmployees() { + + List employeeList = new ArrayList<>(); + + Employee employee1 = new Employee("1", "Employee 1 Name"); + Employee employee2 = new Employee("2", "Employee 2 Name"); + Employee employee3 = new Employee("3", "Employee 3 Name"); + + employeeList.add(employee1); + employeeList.add(employee2); + employeeList.add(employee3); + + Flux employeeFlux = Flux.fromIterable(employeeList); + + given(employeeRepository.findAllEmployees()).willReturn(employeeFlux); + testClient.get() + .uri("/employees") + .exchange() + .expectStatus() + .isOk() + .expectBodyList(Employee.class) + .hasSize(3) + .isEqualTo(employeeList); + } +} diff --git a/spring-reactive/src/test/java/com/baeldung/reactive/webflux/functional/EmployeeSpringFunctionalIntegrationTest.java b/spring-reactive/src/test/java/com/baeldung/reactive/webflux/functional/EmployeeSpringFunctionalIntegrationTest.java new file mode 100644 index 0000000000..da866724cc --- /dev/null +++ b/spring-reactive/src/test/java/com/baeldung/reactive/webflux/functional/EmployeeSpringFunctionalIntegrationTest.java @@ -0,0 +1,92 @@ +package com.baeldung.reactive.webflux.functional; + +import com.baeldung.reactive.webflux.Employee; +import com.baeldung.reactive.webflux.EmployeeRepository; +import org.junit.FixMethodOrder; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.MethodSorters; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.boot.test.mock.mockito.MockBean; +import org.springframework.test.context.junit4.SpringRunner; +import org.springframework.test.web.reactive.server.WebTestClient; +import reactor.core.publisher.Flux; +import reactor.core.publisher.Mono; + +import java.util.Arrays; +import java.util.List; + +import static org.mockito.BDDMockito.given; +import static org.mockito.Mockito.verify; + +@RunWith(SpringRunner.class) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT, classes = EmployeeSpringFunctionalApplication.class) +@FixMethodOrder(MethodSorters.NAME_ASCENDING) +public class EmployeeSpringFunctionalIntegrationTest { + + @Autowired + private EmployeeFunctionalConfig config; + + @MockBean + private EmployeeRepository employeeRepository; + + @Test + public void givenEmployeeId_whenGetEmployeeById_thenCorrectEmployee() { + WebTestClient client = WebTestClient + .bindToRouterFunction(config.getEmployeeByIdRoute()) + .build(); + + Employee employee = new Employee("1", "Employee 1"); + + given(employeeRepository.findEmployeeById("1")).willReturn(Mono.just(employee)); + + client.get() + .uri("/employees/1") + .exchange() + .expectStatus() + .isOk() + .expectBody(Employee.class) + .isEqualTo(employee); + } + + @Test + public void whenGetAllEmployees_thenCorrectEmployees() { + WebTestClient client = WebTestClient + .bindToRouterFunction(config.getAllEmployeesRoute()) + .build(); + + List employees = Arrays.asList( + new Employee("1", "Employee 1"), + new Employee("2", "Employee 2")); + + Flux employeeFlux = Flux.fromIterable(employees); + given(employeeRepository.findAllEmployees()).willReturn(employeeFlux); + + client.get() + .uri("/employees") + .exchange() + .expectStatus() + .isOk() + .expectBodyList(Employee.class) + .isEqualTo(employees); + } + + @Test + public void whenUpdateEmployee_thenEmployeeUpdated() { + WebTestClient client = WebTestClient + .bindToRouterFunction(config.updateEmployeeRoute()) + .build(); + + Employee employee = new Employee("1", "Employee 1 Updated"); + + client.post() + .uri("/employees/update") + .body(Mono.just(employee), Employee.class) + .exchange() + .expectStatus() + .isOk(); + + verify(employeeRepository).updateEmployee(employee); + } +} diff --git a/spring-reactive/src/test/resources/logback-test.xml b/spring-reactive/src/test/resources/logback-test.xml new file mode 100644 index 0000000000..8d4771e308 --- /dev/null +++ b/spring-reactive/src/test/resources/logback-test.xml @@ -0,0 +1,12 @@ + + + + + [%d{ISO8601}]-[%thread] %-5level %logger - %msg%n + + + + + + + \ No newline at end of file